Skip to content

Commit a2df321

Browse files
committed
perf: optimize coordinator duty runtime
1 parent 4e6f930 commit a2df321

21 files changed

Lines changed: 1361 additions & 126 deletions
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.benchmark.indexing;
21+
22+
import org.apache.druid.java.util.common.DateTimes;
23+
import org.apache.druid.java.util.common.granularity.Granularities;
24+
import org.apache.druid.metadata.IndexerSqlMetadataStorageCoordinatorTestBase;
25+
import org.apache.druid.metadata.MetadataStorageTablesConfig;
26+
import org.apache.druid.metadata.SqlSegmentsMetadataQuery;
27+
import org.apache.druid.metadata.TestDerbyConnector;
28+
import org.apache.druid.segment.TestHelper;
29+
import org.apache.druid.server.coordinator.CreateDataSegments;
30+
import org.apache.druid.server.http.DataSegmentPlus;
31+
import org.apache.druid.timeline.DataSegment;
32+
import org.joda.time.DateTime;
33+
import org.joda.time.Interval;
34+
import org.openjdk.jmh.annotations.Benchmark;
35+
import org.openjdk.jmh.annotations.BenchmarkMode;
36+
import org.openjdk.jmh.annotations.Fork;
37+
import org.openjdk.jmh.annotations.Level;
38+
import org.openjdk.jmh.annotations.Measurement;
39+
import org.openjdk.jmh.annotations.Mode;
40+
import org.openjdk.jmh.annotations.OutputTimeUnit;
41+
import org.openjdk.jmh.annotations.Param;
42+
import org.openjdk.jmh.annotations.Scope;
43+
import org.openjdk.jmh.annotations.Setup;
44+
import org.openjdk.jmh.annotations.State;
45+
import org.openjdk.jmh.annotations.TearDown;
46+
import org.openjdk.jmh.annotations.Warmup;
47+
import org.openjdk.jmh.infra.Blackhole;
48+
49+
import java.util.ArrayList;
50+
import java.util.HashSet;
51+
import java.util.List;
52+
import java.util.Set;
53+
import java.util.concurrent.TimeUnit;
54+
55+
/**
56+
* Benchmarks the per-cycle metadata-store cost of the {@code KillUnusedSegments} coordinator duty against a seeded
57+
* metadata store where only a small fraction of datasources have killable unused segments (as in production).
58+
*
59+
* <ul>
60+
* <li>{@link #oldCycle}: one {@code retrieveUnusedSegmentIntervals} query per datasource across the cluster.</li>
61+
* <li>{@link #newCycle}: a single aggregate query to find the datasources that actually have killable unused
62+
* segments, then a per-datasource query only for those.</li>
63+
* </ul>
64+
*
65+
* Derby runs in-process, so this <em>understates</em> the production benefit: a real metadata store also pays a network
66+
* round-trip per eliminated query.
67+
*/
68+
@State(Scope.Benchmark)
69+
@Fork(value = 1)
70+
@Warmup(iterations = 2, time = 2)
71+
@Measurement(iterations = 5, time = 2)
72+
@BenchmarkMode({Mode.AverageTime})
73+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
74+
public class KillUnusedSegmentsBenchmark
75+
{
76+
private static final DateTime JAN_1 = DateTimes.of("2025-01-01");
77+
private static final String V1 = JAN_1.toString();
78+
private static final DateTime UNUSED_UPDATED_ON = DateTimes.of("2025-02-01");
79+
80+
/** Limit passed to retrieveUnusedSegmentIntervals, mirroring maxSegmentsToKill. */
81+
private static final int KILL_LIMIT = 1000;
82+
private static final int USED_INTERVALS_PER_DS = 5;
83+
private static final int UNUSED_INTERVALS_PER_DS = 5;
84+
85+
/** Total datasources in the cluster (prod ≈ 1,484). */
86+
@Param({"200", "1000"})
87+
private int numDatasources;
88+
89+
/** Percent of datasources that actually have killable unused segments. */
90+
@Param({"2"})
91+
private int percentWithUnused;
92+
93+
private TestDerbyConnector derbyConnector;
94+
private List<String> allDatasources;
95+
private DateTime maxEndTime;
96+
private DateTime maxUsedStatusLastUpdatedTime;
97+
98+
@Setup(Level.Trial)
99+
public void setup()
100+
{
101+
derbyConnector = new TestDerbyConnector();
102+
derbyConnector.createDatabase();
103+
derbyConnector.createSegmentTable();
104+
105+
allDatasources = new ArrayList<>(numDatasources);
106+
final int dsWithUnused = Math.max(1, (numDatasources * percentWithUnused) / 100);
107+
108+
final Set<DataSegmentPlus> toInsert = new HashSet<>();
109+
for (int i = 0; i < numDatasources; i++) {
110+
final String datasource = "ds_" + i;
111+
allDatasources.add(datasource);
112+
113+
// Every datasource has some USED segments so its per-ds unused query must round-trip but returns nothing.
114+
for (DataSegment segment : usedSegments(datasource)) {
115+
toInsert.add(asPlus(segment, true, JAN_1));
116+
}
117+
118+
// A small fraction additionally have killable UNUSED segments.
119+
if (i < dsWithUnused) {
120+
for (DataSegment segment : unusedSegments(datasource)) {
121+
toInsert.add(asPlus(segment, false, UNUSED_UPDATED_ON));
122+
}
123+
}
124+
}
125+
IndexerSqlMetadataStorageCoordinatorTestBase.insertSegments(
126+
toInsert,
127+
false,
128+
derbyConnector,
129+
TestHelper.JSON_MAPPER
130+
);
131+
132+
maxEndTime = JAN_1.plusYears(10);
133+
maxUsedStatusLastUpdatedTime = DateTimes.nowUtc();
134+
}
135+
136+
@TearDown(Level.Trial)
137+
public void tearDown()
138+
{
139+
derbyConnector.tearDown();
140+
}
141+
142+
/**
143+
* Current behavior: one query per datasource.
144+
*/
145+
@Benchmark
146+
public void oldCycle(Blackhole blackhole)
147+
{
148+
for (String datasource : allDatasources) {
149+
blackhole.consume(retrieveUnusedSegmentIntervals(datasource));
150+
}
151+
}
152+
153+
/**
154+
* Optimized: one aggregate query to pre-filter, then query only the matching datasources.
155+
*/
156+
@Benchmark
157+
public void newCycle(Blackhole blackhole)
158+
{
159+
final Set<String> datasourcesWithUnused = retrieveDatasourcesWithUnusedSegments();
160+
blackhole.consume(datasourcesWithUnused);
161+
for (String datasource : allDatasources) {
162+
if (datasourcesWithUnused.contains(datasource)) {
163+
blackhole.consume(retrieveUnusedSegmentIntervals(datasource));
164+
}
165+
}
166+
}
167+
168+
/**
169+
* One read-only transaction per call, mirroring {@code getUnusedSegmentIntervals}.
170+
*/
171+
private List<Interval> retrieveUnusedSegmentIntervals(String datasource)
172+
{
173+
final MetadataStorageTablesConfig tablesConfig = derbyConnector.getMetadataTablesConfig();
174+
return derbyConnector.inReadOnlyTransaction(
175+
(handle, status) -> {
176+
final SqlSegmentsMetadataQuery query =
177+
SqlSegmentsMetadataQuery.forHandle(handle, derbyConnector, tablesConfig, TestHelper.JSON_MAPPER);
178+
return query.retrieveUnusedSegmentIntervals(
179+
datasource,
180+
null,
181+
maxEndTime,
182+
KILL_LIMIT,
183+
maxUsedStatusLastUpdatedTime
184+
);
185+
}
186+
);
187+
}
188+
189+
/**
190+
* The aggregate pre-filter: a single query returning datasources that have at least one killable unused segment.
191+
* Conservative superset of what each per-ds query would find. Exercises the real production method.
192+
*/
193+
private Set<String> retrieveDatasourcesWithUnusedSegments()
194+
{
195+
final MetadataStorageTablesConfig tablesConfig = derbyConnector.getMetadataTablesConfig();
196+
return derbyConnector.inReadOnlyTransaction(
197+
(handle, status) ->
198+
SqlSegmentsMetadataQuery.forHandle(handle, derbyConnector, tablesConfig, TestHelper.JSON_MAPPER)
199+
.retrieveDatasourcesWithUnusedSegments(maxUsedStatusLastUpdatedTime)
200+
);
201+
}
202+
203+
private static List<DataSegment> usedSegments(String datasource)
204+
{
205+
return CreateDataSegments.ofDatasource(datasource)
206+
.forIntervals(USED_INTERVALS_PER_DS, Granularities.DAY)
207+
.startingAt(JAN_1)
208+
.withVersion(V1)
209+
.eachOfSize(1);
210+
}
211+
212+
private static List<DataSegment> unusedSegments(String datasource)
213+
{
214+
return CreateDataSegments.ofDatasource(datasource)
215+
.forIntervals(UNUSED_INTERVALS_PER_DS, Granularities.DAY)
216+
.startingAt(JAN_1.plusDays(USED_INTERVALS_PER_DS))
217+
.withVersion(V1)
218+
.eachOfSize(1);
219+
}
220+
221+
private static DataSegmentPlus asPlus(DataSegment segment, boolean used, DateTime usedStatusLastUpdated)
222+
{
223+
return new DataSegmentPlus(
224+
segment,
225+
JAN_1,
226+
usedStatusLastUpdated,
227+
used,
228+
null,
229+
null,
230+
null,
231+
null
232+
);
233+
}
234+
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.server.coordinator;
21+
22+
import com.google.common.collect.ImmutableMap;
23+
import com.google.common.util.concurrent.ListeningExecutorService;
24+
import com.google.common.util.concurrent.MoreExecutors;
25+
import org.apache.druid.client.ImmutableDruidDataSource;
26+
import org.apache.druid.client.ImmutableDruidServer;
27+
import org.apache.druid.java.util.common.DateTimes;
28+
import org.apache.druid.java.util.common.Intervals;
29+
import org.apache.druid.server.coordination.DruidServerMetadata;
30+
import org.apache.druid.server.coordination.ServerType;
31+
import org.apache.druid.server.coordinator.balancer.CostBalancerStrategy;
32+
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
33+
import org.apache.druid.timeline.DataSegment;
34+
import org.joda.time.Interval;
35+
import org.openjdk.jmh.annotations.Benchmark;
36+
import org.openjdk.jmh.annotations.BenchmarkMode;
37+
import org.openjdk.jmh.annotations.Fork;
38+
import org.openjdk.jmh.annotations.Level;
39+
import org.openjdk.jmh.annotations.Measurement;
40+
import org.openjdk.jmh.annotations.Mode;
41+
import org.openjdk.jmh.annotations.OutputTimeUnit;
42+
import org.openjdk.jmh.annotations.Param;
43+
import org.openjdk.jmh.annotations.Scope;
44+
import org.openjdk.jmh.annotations.Setup;
45+
import org.openjdk.jmh.annotations.State;
46+
import org.openjdk.jmh.annotations.TearDown;
47+
import org.openjdk.jmh.annotations.Warmup;
48+
49+
import java.util.ArrayList;
50+
import java.util.Collections;
51+
import java.util.List;
52+
import java.util.concurrent.TimeUnit;
53+
54+
/**
55+
* Benchmarks {@link CostBalancerStrategy#computePlacementCost}, the per-server placement-cost computation the balancer
56+
* invokes for every candidate server when placing a segment. The cost of a single call scales with the number of
57+
* interval buckets the server's segments occupy, so {@code historyDays} (the span of daily intervals held by the
58+
* server) is the primary parameter to vary.
59+
*/
60+
@State(Scope.Benchmark)
61+
@BenchmarkMode(Mode.AverageTime)
62+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
63+
@Warmup(iterations = 3, time = 2)
64+
@Measurement(iterations = 5, time = 2)
65+
@Fork(1)
66+
public class ComputePlacementCostBenchmark
67+
{
68+
private static final long DAY_MILLIS = TimeUnit.DAYS.toMillis(1);
69+
private static final long T0 = DateTimes.of("2026-01-01T00:00:00Z").getMillis();
70+
private static final String DATASOURCE = "ds0";
71+
72+
/** Span of contiguous daily intervals held by the server. */
73+
@Param({"180", "730", "3650"})
74+
private int historyDays;
75+
76+
private ListeningExecutorService exec;
77+
private ExposedCostBalancerStrategy strategy;
78+
private ServerHolder server;
79+
private DataSegment proposalSegment;
80+
81+
@Setup(Level.Trial)
82+
public void setup()
83+
{
84+
exec = MoreExecutors.newDirectExecutorService();
85+
strategy = new ExposedCostBalancerStrategy(exec);
86+
87+
final List<DataSegment> segments = new ArrayList<>(historyDays);
88+
for (int day = 0; day < historyDays; day++) {
89+
final long start = T0 - (long) (day + 1) * DAY_MILLIS;
90+
segments.add(createSegment(Intervals.utc(start, start + DAY_MILLIS)));
91+
}
92+
93+
final ImmutableDruidServer immutableServer = new ImmutableDruidServer(
94+
new DruidServerMetadata("server", "host", null, 1L << 40, null, ServerType.HISTORICAL, "_default_tier", 0),
95+
0L,
96+
ImmutableMap.of(DATASOURCE, new ImmutableDruidDataSource(DATASOURCE, Collections.emptyMap(), segments)),
97+
segments.size()
98+
);
99+
server = new ServerHolder(immutableServer, new TestLoadQueuePeon());
100+
101+
proposalSegment = createSegment(Intervals.utc(T0 - DAY_MILLIS, T0));
102+
}
103+
104+
@TearDown(Level.Trial)
105+
public void tearDown()
106+
{
107+
exec.shutdownNow();
108+
}
109+
110+
@Benchmark
111+
public double computePlacementCost()
112+
{
113+
return strategy.computePlacementCost(proposalSegment, server);
114+
}
115+
116+
private static DataSegment createSegment(Interval interval)
117+
{
118+
return new DataSegment(
119+
DATASOURCE,
120+
interval,
121+
"v1",
122+
Collections.emptyMap(),
123+
Collections.emptyList(),
124+
Collections.emptyList(),
125+
null,
126+
0,
127+
1
128+
);
129+
}
130+
131+
/**
132+
* Exposes the protected {@link CostBalancerStrategy#computePlacementCost} so the benchmark exercises the production
133+
* implementation directly.
134+
*/
135+
private static class ExposedCostBalancerStrategy extends CostBalancerStrategy
136+
{
137+
ExposedCostBalancerStrategy(ListeningExecutorService exec)
138+
{
139+
super(exec);
140+
}
141+
142+
@Override
143+
public double computePlacementCost(DataSegment proposalSegment, ServerHolder server)
144+
{
145+
return super.computePlacementCost(proposalSegment, server);
146+
}
147+
}
148+
}

0 commit comments

Comments
 (0)