Skip to content

Commit fc83ff7

Browse files
committed
perf: optimize coordinator duty runtime
1 parent 4e6f930 commit fc83ff7

15 files changed

Lines changed: 915 additions & 117 deletions
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.server.coordinator;
21+
22+
import com.google.common.collect.ImmutableMap;
23+
import com.google.common.util.concurrent.ListeningExecutorService;
24+
import com.google.common.util.concurrent.MoreExecutors;
25+
import org.apache.druid.client.ImmutableDruidDataSource;
26+
import org.apache.druid.client.ImmutableDruidServer;
27+
import org.apache.druid.java.util.common.DateTimes;
28+
import org.apache.druid.java.util.common.Intervals;
29+
import org.apache.druid.server.coordination.DruidServerMetadata;
30+
import org.apache.druid.server.coordination.ServerType;
31+
import org.apache.druid.server.coordinator.balancer.CostBalancerStrategy;
32+
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
33+
import org.apache.druid.timeline.DataSegment;
34+
import org.joda.time.Interval;
35+
import org.openjdk.jmh.annotations.Benchmark;
36+
import org.openjdk.jmh.annotations.BenchmarkMode;
37+
import org.openjdk.jmh.annotations.Fork;
38+
import org.openjdk.jmh.annotations.Level;
39+
import org.openjdk.jmh.annotations.Measurement;
40+
import org.openjdk.jmh.annotations.Mode;
41+
import org.openjdk.jmh.annotations.OutputTimeUnit;
42+
import org.openjdk.jmh.annotations.Param;
43+
import org.openjdk.jmh.annotations.Scope;
44+
import org.openjdk.jmh.annotations.Setup;
45+
import org.openjdk.jmh.annotations.State;
46+
import org.openjdk.jmh.annotations.TearDown;
47+
import org.openjdk.jmh.annotations.Warmup;
48+
49+
import java.util.ArrayList;
50+
import java.util.Collections;
51+
import java.util.List;
52+
import java.util.concurrent.TimeUnit;
53+
54+
/**
55+
* Benchmarks {@link CostBalancerStrategy#computePlacementCost}, the per-server placement-cost computation the balancer
56+
* invokes for every candidate server when placing a segment. The cost of a single call scales with the number of
57+
* interval buckets the server's segments occupy, so {@code historyDays} (the span of daily intervals held by the
58+
* server) is the primary parameter to vary.
59+
*/
60+
@State(Scope.Benchmark)
61+
@BenchmarkMode(Mode.AverageTime)
62+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
63+
@Warmup(iterations = 3, time = 2)
64+
@Measurement(iterations = 5, time = 2)
65+
@Fork(1)
66+
public class ComputePlacementCostBenchmark
67+
{
68+
private static final long DAY_MILLIS = TimeUnit.DAYS.toMillis(1);
69+
private static final long T0 = DateTimes.of("2026-01-01T00:00:00Z").getMillis();
70+
private static final String DATASOURCE = "ds0";
71+
72+
/** Span of contiguous daily intervals held by the server. */
73+
@Param({"180", "730", "3650"})
74+
private int historyDays;
75+
76+
private ListeningExecutorService exec;
77+
private ExposedCostBalancerStrategy strategy;
78+
private ServerHolder server;
79+
private DataSegment proposalSegment;
80+
81+
@Setup(Level.Trial)
82+
public void setup()
83+
{
84+
exec = MoreExecutors.newDirectExecutorService();
85+
strategy = new ExposedCostBalancerStrategy(exec);
86+
87+
final List<DataSegment> segments = new ArrayList<>(historyDays);
88+
for (int day = 0; day < historyDays; day++) {
89+
final long start = T0 - (long) (day + 1) * DAY_MILLIS;
90+
segments.add(createSegment(Intervals.utc(start, start + DAY_MILLIS)));
91+
}
92+
93+
final ImmutableDruidServer immutableServer = new ImmutableDruidServer(
94+
new DruidServerMetadata("server", "host", null, 1L << 40, null, ServerType.HISTORICAL, "_default_tier", 0),
95+
0L,
96+
ImmutableMap.of(DATASOURCE, new ImmutableDruidDataSource(DATASOURCE, Collections.emptyMap(), segments)),
97+
segments.size()
98+
);
99+
server = new ServerHolder(immutableServer, new TestLoadQueuePeon());
100+
101+
proposalSegment = createSegment(Intervals.utc(T0 - DAY_MILLIS, T0));
102+
}
103+
104+
@TearDown(Level.Trial)
105+
public void tearDown()
106+
{
107+
exec.shutdownNow();
108+
}
109+
110+
@Benchmark
111+
public double computePlacementCost()
112+
{
113+
return strategy.computePlacementCost(proposalSegment, server);
114+
}
115+
116+
private static DataSegment createSegment(Interval interval)
117+
{
118+
return new DataSegment(
119+
DATASOURCE,
120+
interval,
121+
"v1",
122+
Collections.emptyMap(),
123+
Collections.emptyList(),
124+
Collections.emptyList(),
125+
null,
126+
0,
127+
1
128+
);
129+
}
130+
131+
/**
132+
* Exposes the protected {@link CostBalancerStrategy#computePlacementCost} so the benchmark exercises the production
133+
* implementation directly.
134+
*/
135+
private static class ExposedCostBalancerStrategy extends CostBalancerStrategy
136+
{
137+
ExposedCostBalancerStrategy(ListeningExecutorService exec)
138+
{
139+
super(exec);
140+
}
141+
142+
@Override
143+
public double computePlacementCost(DataSegment proposalSegment, ServerHolder server)
144+
{
145+
return super.computePlacementCost(proposalSegment, server);
146+
}
147+
}
148+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.server.coordinator;
21+
22+
import com.google.common.collect.ImmutableMap;
23+
import org.apache.druid.client.DataSourcesSnapshot;
24+
import org.apache.druid.client.ImmutableDruidDataSource;
25+
import org.apache.druid.client.ImmutableDruidServer;
26+
import org.apache.druid.java.util.common.DateTimes;
27+
import org.apache.druid.server.coordination.DruidServerMetadata;
28+
import org.apache.druid.server.coordination.ServerType;
29+
import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategy;
30+
import org.apache.druid.server.coordinator.duty.MarkOvershadowedSegmentsAsUnused;
31+
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
32+
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
33+
import org.apache.druid.timeline.DataSegment;
34+
import org.joda.time.DateTime;
35+
import org.joda.time.Interval;
36+
import org.openjdk.jmh.annotations.Benchmark;
37+
import org.openjdk.jmh.annotations.BenchmarkMode;
38+
import org.openjdk.jmh.annotations.Fork;
39+
import org.openjdk.jmh.annotations.Level;
40+
import org.openjdk.jmh.annotations.Measurement;
41+
import org.openjdk.jmh.annotations.Mode;
42+
import org.openjdk.jmh.annotations.OutputTimeUnit;
43+
import org.openjdk.jmh.annotations.Param;
44+
import org.openjdk.jmh.annotations.Scope;
45+
import org.openjdk.jmh.annotations.Setup;
46+
import org.openjdk.jmh.annotations.State;
47+
import org.openjdk.jmh.annotations.Warmup;
48+
49+
import java.util.ArrayList;
50+
import java.util.List;
51+
import java.util.concurrent.TimeUnit;
52+
53+
/**
54+
* Benchmarks the {@code MarkOvershadowedSegmentsAsUnused} coordinator duty's {@code run} method against a cluster where
55+
* most datasources have no overshadowed segments (as in production). The duty builds segment timelines from the served
56+
* segments only for the datasources that actually have overshadowed segments, so the cost is dominated by how many
57+
* datasources are relevant relative to the total served.
58+
*/
59+
@State(Scope.Benchmark)
60+
@BenchmarkMode(Mode.AverageTime)
61+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
62+
@Warmup(iterations = 3, time = 2)
63+
@Measurement(iterations = 5, time = 2)
64+
@Fork(1)
65+
public class MarkOvershadowedSegmentsAsUnusedBenchmark
66+
{
67+
private static final DateTime START = DateTimes.of("2024-01-01");
68+
69+
/** Total datasources served by the cluster. */
70+
@Param({"1000"})
71+
private int numDatasources;
72+
73+
/** Datasources that have overshadowed segments (an older version shadowed by a newer one). */
74+
@Param({"10", "100"})
75+
private int relevantDatasources;
76+
77+
@Param({"4"})
78+
private int intervalsPerDatasource;
79+
80+
private MarkOvershadowedSegmentsAsUnused duty;
81+
private DruidCoordinatorRuntimeParams params;
82+
83+
@Setup(Level.Trial)
84+
public void setup()
85+
{
86+
final List<DataSegment> usedSegments = new ArrayList<>();
87+
final ImmutableMap.Builder<String, ImmutableDruidDataSource> dataSources = ImmutableMap.builder();
88+
89+
for (int d = 0; d < numDatasources; d++) {
90+
final String datasource = "ds_" + d;
91+
final boolean hasOvershadowed = d < relevantDatasources;
92+
final List<DataSegment> dsSegments = new ArrayList<>();
93+
for (int i = 0; i < intervalsPerDatasource; i++) {
94+
final Interval interval = new Interval(START.plusDays(i), START.plusDays(i + 1));
95+
dsSegments.add(segment(datasource, interval, "v1"));
96+
if (hasOvershadowed) {
97+
// A newer version overshadows the v1 segment for the same interval.
98+
dsSegments.add(segment(datasource, interval, "v2"));
99+
}
100+
}
101+
usedSegments.addAll(dsSegments);
102+
dataSources.put(datasource, new ImmutableDruidDataSource(datasource, ImmutableMap.of(), dsSegments));
103+
}
104+
105+
final ImmutableDruidServer server = new ImmutableDruidServer(
106+
new DruidServerMetadata("server", "host", null, 1L << 40, null, ServerType.HISTORICAL, "_default_tier", 0),
107+
0L,
108+
dataSources.build(),
109+
usedSegments.size()
110+
);
111+
final DruidCluster cluster = DruidCluster
112+
.builder()
113+
.add(new ServerHolder(server, new TestLoadQueuePeon()))
114+
.build();
115+
116+
params = DruidCoordinatorRuntimeParams
117+
.builder()
118+
.withDataSourcesSnapshot(DataSourcesSnapshot.fromUsedSegments(usedSegments))
119+
.withDruidCluster(cluster)
120+
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMarkSegmentAsUnusedDelayMillis(0).build())
121+
.withBalancerStrategy(new RandomBalancerStrategy())
122+
.withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null))
123+
.build();
124+
125+
// A no-op delete handler so the benchmark measures the timeline build + overshadow check, not the metadata write.
126+
duty = new MarkOvershadowedSegmentsAsUnused((datasource, segmentIds) -> segmentIds.size());
127+
}
128+
129+
@Benchmark
130+
public DruidCoordinatorRuntimeParams run()
131+
{
132+
return duty.run(params);
133+
}
134+
135+
private static DataSegment segment(String datasource, Interval interval, String version)
136+
{
137+
return DataSegment.builder()
138+
.dataSource(datasource)
139+
.interval(interval)
140+
.version(version)
141+
.size(1)
142+
.build();
143+
}
144+
}

server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1093,6 +1093,7 @@ public List<Interval> retrieveUnusedSegmentIntervals(String dataSource, int limi
10931093
return intervals.stream().filter(Objects::nonNull).collect(Collectors.toList());
10941094
}
10951095

1096+
10961097
/**
10971098
* Retrieves unused segments that exactly match the given interval.
10981099
*

0 commit comments

Comments
 (0)