From 205af945b2cf6a3c841cd7f6637d6488f6ec6db5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jesse=20Tu=C4=9Flu?= Date: Sat, 30 May 2026 10:02:45 -0700 Subject: [PATCH] perf: optimize coordinator duty runtime --- .../ComputePlacementCostBenchmark.java | 148 ++++++++++++++++ ...OvershadowedSegmentsAsUnusedBenchmark.java | 144 +++++++++++++++ .../metadata/SqlSegmentsMetadataQuery.java | 1 + .../server/coordinator/DruidCoordinator.java | 99 +++++++++-- .../coordinator/SegmentCountsPerInterval.java | 90 +++++++++- .../balancer/CostBalancerStrategy.java | 128 +++++++++++--- .../coordinator/duty/BalanceSegments.java | 36 +++- .../duty/CoordinatorDutyGroup.java | 19 ++ .../duty/KillStalePendingSegments.java | 35 +++- .../coordinator/duty/KillUnusedSegments.java | 45 ++++- .../duty/MarkEternityTombstonesAsUnused.java | 105 +++++++---- .../MarkOvershadowedSegmentsAsUnused.java | 69 +++++++- .../duty/PrepareBalancerAndLoadQueues.java | 60 +++++-- .../server/coordinator/duty/RunRules.java | 52 +++++- .../duty/UnloadUnusedSegments.java | 20 ++- .../loading/SegmentReplicationStatus.java | 101 ++++++++++- .../loading/StrategicSegmentAssigner.java | 10 +- .../loading/SegmentReplicationStatusTest.java | 164 ++++++++++++++++++ 18 files changed, 1193 insertions(+), 133 deletions(-) create mode 100644 benchmarks/src/test/java/org/apache/druid/server/coordinator/ComputePlacementCostBenchmark.java create mode 100644 benchmarks/src/test/java/org/apache/druid/server/coordinator/MarkOvershadowedSegmentsAsUnusedBenchmark.java create mode 100644 server/src/test/java/org/apache/druid/server/coordinator/loading/SegmentReplicationStatusTest.java diff --git a/benchmarks/src/test/java/org/apache/druid/server/coordinator/ComputePlacementCostBenchmark.java b/benchmarks/src/test/java/org/apache/druid/server/coordinator/ComputePlacementCostBenchmark.java new file mode 100644 index 000000000000..80c6a0fc8f9f --- /dev/null +++ b/benchmarks/src/test/java/org/apache/druid/server/coordinator/ComputePlacementCostBenchmark.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.client.ImmutableDruidDataSource; +import org.apache.druid.client.ImmutableDruidServer; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.server.coordination.DruidServerMetadata; +import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordinator.balancer.CostBalancerStrategy; +import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Benchmarks {@link CostBalancerStrategy#computePlacementCost}, the per-server placement-cost computation the balancer + * invokes for every candidate server when placing a segment. The cost of a single call scales with the number of + * interval buckets the server's segments occupy, so {@code historyDays} (the span of daily intervals held by the + * server) is the primary parameter to vary. + */ +@State(Scope.Benchmark) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@Warmup(iterations = 3, time = 2) +@Measurement(iterations = 5, time = 2) +@Fork(1) +public class ComputePlacementCostBenchmark +{ + private static final long DAY_MILLIS = TimeUnit.DAYS.toMillis(1); + private static final long T0 = DateTimes.of("2026-01-01T00:00:00Z").getMillis(); + private static final String DATASOURCE = "ds0"; + + /** Span of contiguous daily intervals held by the server. */ + @Param({"180", "730", "3650"}) + private int historyDays; + + private ListeningExecutorService exec; + private ExposedCostBalancerStrategy strategy; + private ServerHolder server; + private DataSegment proposalSegment; + + @Setup(Level.Trial) + public void setup() + { + exec = MoreExecutors.newDirectExecutorService(); + strategy = new ExposedCostBalancerStrategy(exec); + + final List segments = new ArrayList<>(historyDays); + for (int day = 0; day < historyDays; day++) { + final long start = T0 - (long) (day + 1) * DAY_MILLIS; + segments.add(createSegment(Intervals.utc(start, start + DAY_MILLIS))); + } + + final ImmutableDruidServer immutableServer = new ImmutableDruidServer( + new DruidServerMetadata("server", "host", null, 1L << 40, null, ServerType.HISTORICAL, "_default_tier", 0), + 0L, + ImmutableMap.of(DATASOURCE, new ImmutableDruidDataSource(DATASOURCE, Collections.emptyMap(), segments)), + segments.size() + ); + server = new ServerHolder(immutableServer, new TestLoadQueuePeon()); + + proposalSegment = createSegment(Intervals.utc(T0 - DAY_MILLIS, T0)); + } + + @TearDown(Level.Trial) + public void tearDown() + { + exec.shutdownNow(); + } + + @Benchmark + public double computePlacementCost() + { + return strategy.computePlacementCost(proposalSegment, server); + } + + private static DataSegment createSegment(Interval interval) + { + return new DataSegment( + DATASOURCE, + interval, + "v1", + Collections.emptyMap(), + Collections.emptyList(), + Collections.emptyList(), + null, + 0, + 1 + ); + } + + /** + * Exposes the protected {@link CostBalancerStrategy#computePlacementCost} so the benchmark exercises the production + * implementation directly. + */ + private static class ExposedCostBalancerStrategy extends CostBalancerStrategy + { + ExposedCostBalancerStrategy(ListeningExecutorService exec) + { + super(exec); + } + + @Override + public double computePlacementCost(DataSegment proposalSegment, ServerHolder server) + { + return super.computePlacementCost(proposalSegment, server); + } + } +} diff --git a/benchmarks/src/test/java/org/apache/druid/server/coordinator/MarkOvershadowedSegmentsAsUnusedBenchmark.java b/benchmarks/src/test/java/org/apache/druid/server/coordinator/MarkOvershadowedSegmentsAsUnusedBenchmark.java new file mode 100644 index 000000000000..36022bb2a916 --- /dev/null +++ b/benchmarks/src/test/java/org/apache/druid/server/coordinator/MarkOvershadowedSegmentsAsUnusedBenchmark.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.client.DataSourcesSnapshot; +import org.apache.druid.client.ImmutableDruidDataSource; +import org.apache.druid.client.ImmutableDruidServer; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.server.coordination.DruidServerMetadata; +import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategy; +import org.apache.druid.server.coordinator.duty.MarkOvershadowedSegmentsAsUnused; +import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; +import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Benchmarks the {@code MarkOvershadowedSegmentsAsUnused} coordinator duty's {@code run} method against a cluster where + * most datasources have no overshadowed segments (as in production). The duty builds segment timelines from the served + * segments only for the datasources that actually have overshadowed segments, so the cost is dominated by how many + * datasources are relevant relative to the total served. + */ +@State(Scope.Benchmark) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Warmup(iterations = 3, time = 2) +@Measurement(iterations = 5, time = 2) +@Fork(1) +public class MarkOvershadowedSegmentsAsUnusedBenchmark +{ + private static final DateTime START = DateTimes.of("2024-01-01"); + + /** Total datasources served by the cluster. */ + @Param({"1000"}) + private int numDatasources; + + /** Datasources that have overshadowed segments (an older version shadowed by a newer one). */ + @Param({"10", "100"}) + private int relevantDatasources; + + @Param({"4"}) + private int intervalsPerDatasource; + + private MarkOvershadowedSegmentsAsUnused duty; + private DruidCoordinatorRuntimeParams params; + + @Setup(Level.Trial) + public void setup() + { + final List usedSegments = new ArrayList<>(); + final ImmutableMap.Builder dataSources = ImmutableMap.builder(); + + for (int d = 0; d < numDatasources; d++) { + final String datasource = "ds_" + d; + final boolean hasOvershadowed = d < relevantDatasources; + final List dsSegments = new ArrayList<>(); + for (int i = 0; i < intervalsPerDatasource; i++) { + final Interval interval = new Interval(START.plusDays(i), START.plusDays(i + 1)); + dsSegments.add(segment(datasource, interval, "v1")); + if (hasOvershadowed) { + // A newer version overshadows the v1 segment for the same interval. + dsSegments.add(segment(datasource, interval, "v2")); + } + } + usedSegments.addAll(dsSegments); + dataSources.put(datasource, new ImmutableDruidDataSource(datasource, ImmutableMap.of(), dsSegments)); + } + + final ImmutableDruidServer server = new ImmutableDruidServer( + new DruidServerMetadata("server", "host", null, 1L << 40, null, ServerType.HISTORICAL, "_default_tier", 0), + 0L, + dataSources.build(), + usedSegments.size() + ); + final DruidCluster cluster = DruidCluster + .builder() + .add(new ServerHolder(server, new TestLoadQueuePeon())) + .build(); + + params = DruidCoordinatorRuntimeParams + .builder() + .withDataSourcesSnapshot(DataSourcesSnapshot.fromUsedSegments(usedSegments)) + .withDruidCluster(cluster) + .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMarkSegmentAsUnusedDelayMillis(0).build()) + .withBalancerStrategy(new RandomBalancerStrategy()) + .withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null)) + .build(); + + // A no-op delete handler so the benchmark measures the timeline build + overshadow check, not the metadata write. + duty = new MarkOvershadowedSegmentsAsUnused((datasource, segmentIds) -> segmentIds.size()); + } + + @Benchmark + public DruidCoordinatorRuntimeParams run() + { + return duty.run(params); + } + + private static DataSegment segment(String datasource, Interval interval, String version) + { + return DataSegment.builder() + .dataSource(datasource) + .interval(interval) + .version(version) + .size(1) + .build(); + } +} diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index bf4e20c86a63..d7c28d45e276 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -1093,6 +1093,7 @@ public List retrieveUnusedSegmentIntervals(String dataSource, int limi return intervals.stream().filter(Objects::nonNull).collect(Collectors.toList()); } + /** * Retrieves unused segments that exactly match the given interval. * diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 41237e352997..cd25fd8e8433 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -39,6 +39,7 @@ import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; @@ -782,11 +783,32 @@ private class UpdateReplicationStatus implements CoordinatorDuty @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { + final Stopwatch totalTime = Stopwatch.createStarted(); + + final Stopwatch broadcastTime = Stopwatch.createStarted(); broadcastSegments = params.getBroadcastSegments(); + broadcastTime.stop(); + + final Stopwatch replStatusTime = Stopwatch.createStarted(); segmentReplicationStatus = params.getSegmentReplicationStatus(); + replStatusTime.stop(); + + final Stopwatch cacheUpdateTime = Stopwatch.createStarted(); if (coordinatorSegmentMetadataCache != null) { coordinatorSegmentMetadataCache.updateSegmentReplicationStatus(segmentReplicationStatus); } + cacheUpdateTime.stop(); + + log.info( + "UpdateReplicationStatus summary: broadcastSegments[%,d], hasReplStatus[%s], hasMetaCache[%s];" + + " broadcastMs[%,d], replStatusMs[%,d], cacheUpdateMs[%,d], totalMs[%,d].", + broadcastSegments == null ? 0 : broadcastSegments.size(), + segmentReplicationStatus != null, + coordinatorSegmentMetadataCache != null, + broadcastTime.millisElapsed(), replStatusTime.millisElapsed(), + cacheUpdateTime.millisElapsed(), totalTime.millisElapsed() + ); + return params; } } @@ -799,27 +821,66 @@ private class CollectSegmentStats implements CoordinatorDuty @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { + final Stopwatch totalTime = Stopwatch.createStarted(); + // Collect stats for unavailable and under-replicated segments final CoordinatorRunStats stats = params.getCoordinatorStats(); - getDatasourceToUnavailableSegmentCount().forEach( - (dataSource, numUnavailable) -> stats.add( - Stats.Segments.UNAVAILABLE, - RowKey.of(Dimension.DATASOURCE, dataSource), - numUnavailable - ) - ); - getTierToDatasourceToUnderReplicatedCount(false).forEach( - (tier, countsPerDatasource) -> countsPerDatasource.forEach( - (dataSource, underReplicatedCount) -> - stats.addToSegmentStat(Stats.Segments.UNDER_REPLICATED, tier, dataSource, underReplicatedCount) - ) - ); - getDatasourceToDeepStorageQueryOnlySegmentCount().forEach( - (dataSource, numDeepStorageOnly) -> stats.add( - Stats.Segments.DEEP_STORAGE_ONLY, - RowKey.of(Dimension.DATASOURCE, dataSource), - numDeepStorageOnly - ) + + final Object2IntMap dsToUnavailable; + final Map> tierToDsToUnderRepl; + final Object2IntMap dsToDeepStorageOnly; + + if (segmentReplicationStatus == null) { + dsToUnavailable = Object2IntMaps.emptyMap(); + tierToDsToUnderRepl = Collections.emptyMap(); + dsToDeepStorageOnly = Object2IntMaps.emptyMap(); + } else { + // Single fused pass replaces three independent full iterations over + // metadataManager.iterateAllUsedSegments(). ignoreMissingServers=true replicates + // the !false inversion inside computeUnderReplicated(segments, false). + final SegmentReplicationStatus.SegmentStatsSnapshot snapshot = segmentReplicationStatus.computeSegmentStats( + metadataManager.iterateAllUsedSegments(), + true + ); + dsToUnavailable = snapshot.getDatasourceToUnavailableCount(); + tierToDsToUnderRepl = snapshot.getTierToDatasourceToUnderReplicatedCount(); + dsToDeepStorageOnly = snapshot.getDatasourceToDeepStorageOnlyCount(); + } + + long unavailableTotal = 0; + for (final Object2IntMap.Entry e : dsToUnavailable.object2IntEntrySet()) { + unavailableTotal += e.getIntValue(); + stats.add(Stats.Segments.UNAVAILABLE, RowKey.of(Dimension.DATASOURCE, e.getKey()), e.getIntValue()); + } + + long underReplTotal = 0; + int tierCount = 0; + for (final Map.Entry> tierEntry : tierToDsToUnderRepl.entrySet()) { + tierCount++; + for (final Object2LongMap.Entry dsEntry : tierEntry.getValue().object2LongEntrySet()) { + underReplTotal += dsEntry.getLongValue(); + stats.addToSegmentStat( + Stats.Segments.UNDER_REPLICATED, + tierEntry.getKey(), + dsEntry.getKey(), + dsEntry.getLongValue() + ); + } + } + + long deepStorageTotal = 0; + for (final Object2IntMap.Entry e : dsToDeepStorageOnly.object2IntEntrySet()) { + deepStorageTotal += e.getIntValue(); + stats.add(Stats.Segments.DEEP_STORAGE_ONLY, RowKey.of(Dimension.DATASOURCE, e.getKey()), e.getIntValue()); + } + + log.info( + "CollectSegmentStats summary: unavailableTotal[%,d] across[%d] ds," + + " underReplicatedTotal[%,d] across[%d] tiers, deepStorageOnly[%,d] across[%d] ds; totalMs[%,d].", + unavailableTotal, dsToUnavailable.size(), + underReplTotal, tierCount, + deepStorageTotal, dsToDeepStorageOnly.size(), + totalTime.millisElapsed() ); return params; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java b/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java index a9767b8f7682..51d7b7c0ff5d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java @@ -25,8 +25,11 @@ import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; /** * Maintains a count of segments for each datasource and interval. @@ -38,6 +41,12 @@ public class SegmentCountsPerInterval private final Map> datasourceIntervalToSegmentCount = new HashMap<>(); private final Object2IntMap intervalToTotalSegmentCount = new Object2IntOpenHashMap<>(); private final Object2IntMap datasourceToTotalSegmentCount = new Object2IntOpenHashMap<>(); + private final NavigableMap> startMillisToTotalSegmentCount = new TreeMap<>(); + private final NavigableMap> endMillisToTotalSegmentCount = new TreeMap<>(); + private final Map>> datasourceStartMillisToSegmentCount + = new HashMap<>(); + private final Map>> datasourceEndMillisToSegmentCount + = new HashMap<>(); public void addSegment(DataSegment segment) { @@ -76,13 +85,86 @@ public Object2IntMap getIntervalToTotalSegmentCount() return intervalToTotalSegmentCount; } + public NavigableMap> getStartMillisToTotalSegmentCount() + { + return startMillisToTotalSegmentCount; + } + + public NavigableMap> getEndMillisToTotalSegmentCount() + { + return endMillisToTotalSegmentCount; + } + + public NavigableMap> getStartMillisToSegmentCount(String datasource) + { + return datasourceStartMillisToSegmentCount.getOrDefault(datasource, Collections.emptyNavigableMap()); + } + + public NavigableMap> getEndMillisToSegmentCount(String datasource) + { + return datasourceEndMillisToSegmentCount.getOrDefault(datasource, Collections.emptyNavigableMap()); + } + private void updateCountInInterval(DataSegment segment, int delta) { totalSegments += delta; - intervalToTotalSegmentCount.mergeInt(segment.getInterval(), delta, Integer::sum); + updateIntervalCount( + intervalToTotalSegmentCount, + startMillisToTotalSegmentCount, + endMillisToTotalSegmentCount, + segment.getInterval(), + delta + ); datasourceToTotalSegmentCount.mergeInt(segment.getDataSource(), delta, Integer::sum); - datasourceIntervalToSegmentCount - .computeIfAbsent(segment.getDataSource(), ds -> new Object2IntOpenHashMap<>()) - .mergeInt(segment.getInterval(), delta, Integer::sum); + + final String datasource = segment.getDataSource(); + updateIntervalCount( + datasourceIntervalToSegmentCount.computeIfAbsent(datasource, ds -> new Object2IntOpenHashMap<>()), + datasourceStartMillisToSegmentCount.computeIfAbsent(datasource, ds -> new TreeMap<>()), + datasourceEndMillisToSegmentCount.computeIfAbsent(datasource, ds -> new TreeMap<>()), + segment.getInterval(), + delta + ); + } + + private static void updateIntervalCount( + Object2IntMap intervalToSegmentCount, + NavigableMap> startMillisToSegmentCount, + NavigableMap> endMillisToSegmentCount, + Interval interval, + int delta + ) + { + final int updatedCount = intervalToSegmentCount.getInt(interval) + delta; + if (updatedCount == 0) { + intervalToSegmentCount.removeInt(interval); + removeFromIntervalIndex(startMillisToSegmentCount, interval.getStartMillis(), interval); + removeFromIntervalIndex(endMillisToSegmentCount, interval.getEndMillis(), interval); + } else { + intervalToSegmentCount.put(interval, updatedCount); + startMillisToSegmentCount + .computeIfAbsent(interval.getStartMillis(), startMillis -> new Object2IntOpenHashMap<>()) + .put(interval, updatedCount); + endMillisToSegmentCount + .computeIfAbsent(interval.getEndMillis(), endMillis -> new Object2IntOpenHashMap<>()) + .put(interval, updatedCount); + } + } + + private static void removeFromIntervalIndex( + NavigableMap> intervalIndex, + long millis, + Interval interval + ) + { + final Object2IntMap segmentCounts = intervalIndex.get(millis); + if (segmentCounts == null) { + return; + } + + segmentCounts.removeInt(interval); + if (segmentCounts.isEmpty()) { + intervalIndex.remove(millis); + } } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java index d9c883bf6c53..dd90dfa02b27 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java @@ -24,7 +24,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; -import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import it.unimi.dsi.fastutil.objects.Object2IntMap; import org.apache.commons.math3.util.FastMath; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; @@ -43,10 +43,12 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.NavigableMap; import java.util.PriorityQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import java.util.stream.Collectors; public class CostBalancerStrategy implements BalancerStrategy @@ -70,6 +72,13 @@ public class CostBalancerStrategy implements BalancerStrategy private final CoordinatorRunStats stats = new CoordinatorRunStats(); private final AtomicLong computeTimeNanos = new AtomicLong(0); + private final LongAdder serverOrderingCalls = new LongAdder(); + private final LongAdder serverCostTasks = new LongAdder(); + private final LongAdder placementCostComputations = new LongAdder(); + private final LongAdder totalIntervalEntriesScanned = new LongAdder(); + private final LongAdder datasourceIntervalEntriesScanned = new LongAdder(); + private final LongAdder totalIntervalEntriesMatched = new LongAdder(); + private final LongAdder datasourceIntervalEntriesMatched = new LongAdder(); public static double computeJointSegmentsCost(DataSegment segment, Iterable segmentSet) { @@ -267,10 +276,33 @@ public Iterator findServersToDropSegment( @Override public CoordinatorRunStats getStats() { + final long computationTimeMs = TimeUnit.NANOSECONDS.toMillis(computeTimeNanos.getAndSet(0)); stats.add( Stats.Balancer.COMPUTATION_TIME, - TimeUnit.NANOSECONDS.toMillis(computeTimeNanos.getAndSet(0)) + computationTimeMs ); + + final long orderingCalls = serverOrderingCalls.sumThenReset(); + final long costTasks = serverCostTasks.sumThenReset(); + final long costComputations = placementCostComputations.sumThenReset(); + final long totalScanned = totalIntervalEntriesScanned.sumThenReset(); + final long datasourceScanned = datasourceIntervalEntriesScanned.sumThenReset(); + final long totalMatched = totalIntervalEntriesMatched.sumThenReset(); + final long datasourceMatched = datasourceIntervalEntriesMatched.sumThenReset(); + if (costComputations > 0) { + log.info( + "CostBalancerStrategy summary: serverOrderings[%,d], serverCostTasks[%,d]," + + " serverCostComputations[%,d], orderWallMs[%,d];" + + " intervalEntriesScanned[total=%,d, datasource=%,d]," + + " intervalEntriesMatched[total=%,d, datasource=%,d]," + + " avgEntriesScannedPerComputation[total=%.1f, datasource=%.1f]," + + " avgEntriesMatchedPerComputation[total=%.1f, datasource=%.1f].", + orderingCalls, costTasks, costComputations, computationTimeMs, + totalScanned, datasourceScanned, totalMatched, datasourceMatched, + average(totalScanned, costComputations), average(datasourceScanned, costComputations), + average(totalMatched, costComputations), average(datasourceMatched, costComputations) + ); + } return stats; } @@ -280,34 +312,61 @@ public CoordinatorRunStats getStats() protected double computePlacementCost(DataSegment proposalSegment, ServerHolder server) { final Interval costComputeInterval = getCostComputeInterval(proposalSegment); - - // Compute number of segments in each interval - final Object2IntOpenHashMap intervalToSegmentCount = new Object2IntOpenHashMap<>(); - final SegmentCountsPerInterval projectedSegments = server.getProjectedSegmentCounts(); - projectedSegments.getIntervalToTotalSegmentCount().object2IntEntrySet().forEach(entry -> { - final Interval interval = entry.getKey(); - if (costComputeInterval.overlaps(interval)) { - intervalToSegmentCount.addTo(interval, entry.getIntValue()); + + final Interval segmentInterval = proposalSegment.getInterval(); + double cost = 0; + int totalScanned = 0; + int datasourceScanned = 0; + int totalMatched = 0; + int datasourceMatched = 0; + + final NavigableMap> totalCountsByStart = + projectedSegments.getStartMillisToTotalSegmentCount(); + final NavigableMap> totalCountsByEnd = + projectedSegments.getEndMillisToTotalSegmentCount(); + final Iterable> totalCandidateBuckets = + shouldScanByStart(totalCountsByStart, totalCountsByEnd, costComputeInterval) + ? totalCountsByStart.headMap(costComputeInterval.getEndMillis(), false).values() + : totalCountsByEnd.tailMap(costComputeInterval.getStartMillis(), false).values(); + for (Object2IntMap intervalCounts : totalCandidateBuckets) { + for (Object2IntMap.Entry entry : intervalCounts.object2IntEntrySet()) { + ++totalScanned; + final Interval interval = entry.getKey(); + if (costComputeInterval.overlaps(interval)) { + // Cost contribution from all segments on the server. + cost += intervalCost(segmentInterval, interval) * entry.getIntValue(); + ++totalMatched; + } } - }); + } // Count the segments for the same datasource twice as they have twice the cost final String datasource = proposalSegment.getDataSource(); - projectedSegments.getIntervalToSegmentCount(datasource).object2IntEntrySet().forEach(entry -> { - final Interval interval = entry.getKey(); - if (costComputeInterval.overlaps(interval)) { - intervalToSegmentCount.addTo(interval, entry.getIntValue()); + final NavigableMap> datasourceCountsByStart = + projectedSegments.getStartMillisToSegmentCount(datasource); + final NavigableMap> datasourceCountsByEnd = + projectedSegments.getEndMillisToSegmentCount(datasource); + final Iterable> datasourceCandidateBuckets = + shouldScanByStart(datasourceCountsByStart, datasourceCountsByEnd, costComputeInterval) + ? datasourceCountsByStart.headMap(costComputeInterval.getEndMillis(), false).values() + : datasourceCountsByEnd.tailMap(costComputeInterval.getStartMillis(), false).values(); + for (Object2IntMap intervalCounts : datasourceCandidateBuckets) { + for (Object2IntMap.Entry entry : intervalCounts.object2IntEntrySet()) { + ++datasourceScanned; + final Interval interval = entry.getKey(); + if (costComputeInterval.overlaps(interval)) { + cost += intervalCost(segmentInterval, interval) * entry.getIntValue(); + ++datasourceMatched; + } } - }); + } - // Compute joint cost for each interval - double cost = 0; - final Interval segmentInterval = proposalSegment.getInterval(); - cost += intervalToSegmentCount.object2IntEntrySet().stream().mapToDouble( - entry -> intervalCost(segmentInterval, entry.getKey()) - * entry.getIntValue() - ).sum(); + placementCostComputations.increment(); + totalIntervalEntriesScanned.add(totalScanned); + datasourceIntervalEntriesScanned.add(datasourceScanned); + totalIntervalEntriesMatched.add(totalMatched); + datasourceIntervalEntriesMatched.add(datasourceMatched); // Minus the self cost of the segment if (server.isProjectedSegment(proposalSegment)) { @@ -328,6 +387,8 @@ private List orderServersByPlacementCost( { final Stopwatch computeTime = Stopwatch.createStarted(); final List>> futures = new ArrayList<>(); + serverOrderingCalls.increment(); + serverCostTasks.add(serverHolders.size()); for (ServerHolder server : serverHolders) { futures.add( exec.submit( @@ -404,5 +465,24 @@ private static Interval getCostComputeInterval(DataSegment segment) } } -} + private static boolean shouldScanByStart( + NavigableMap> countsByStart, + NavigableMap> countsByEnd, + Interval overlapInterval + ) + { + if (countsByStart.isEmpty() || countsByEnd.isEmpty()) { + return true; + } + final double timeFromFirstStartToOverlapEnd = (double) overlapInterval.getEndMillis() - countsByStart.firstKey(); + final double timeFromOverlapStartToLastEnd = countsByEnd.lastKey() - (double) overlapInterval.getStartMillis(); + return timeFromFirstStartToOverlapEnd <= timeFromOverlapStartToLastEnd; + } + + private static double average(long numerator, long denominator) + { + return denominator == 0 ? 0 : ((double) numerator) / denominator; + } + +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java index e3791c9dc233..e7d8855ee4a0 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java @@ -20,6 +20,7 @@ package org.apache.druid.server.coordinator.duty; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.coordinator.DruidCluster; @@ -50,25 +51,49 @@ public BalanceSegments(Duration coordinatorPeriod) @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { + final Stopwatch totalTime = Stopwatch.createStarted(); + if (params.getUsedSegmentCount() <= 0) { + log.info("BalanceSegments skipped: usedSegmentCount[%,d].", params.getUsedSegmentCount()); return params; } - final int maxSegmentsToMove = getMaxSegmentsToMove(params); + final Pair clusterShape = getNumHistoricalsAndSegments(params.getDruidCluster()); + + final int maxSegmentsToMove = getMaxSegmentsToMove(params, clusterShape); params.getCoordinatorStats().add(Stats.Balancer.MAX_TO_MOVE, maxSegmentsToMove); if (maxSegmentsToMove <= 0) { + log.info("BalanceSegments skipped: maxSegmentsToMove[%d].", maxSegmentsToMove); return params; } + final Stopwatch tierBalanceTime = Stopwatch.createStarted(); + final int[] tierCount = {0}; params.getDruidCluster().getManagedHistoricals().forEach( - (tier, servers) -> new TierSegmentBalancer(tier, servers, maxSegmentsToMove, params).run() + (tier, servers) -> { + tierCount[0]++; + final Stopwatch tierTime = Stopwatch.createStarted(); + new TierSegmentBalancer(tier, servers, maxSegmentsToMove, params).run(); + log.info( + "BalanceSegments tier[%s]: servers[%d], elapsedMs[%,d].", + tier, servers.size(), tierTime.millisElapsed() + ); + } ); + tierBalanceTime.stop(); CoordinatorRunStats runStats = params.getCoordinatorStats(); params.getBalancerStrategy() .getStats() .forEachStat(runStats::add); + log.info( + "BalanceSegments summary: maxSegmentsToMove[%,d], tiers[%d], historicals[%d], totalSegmentsInCluster[%,d];" + + " tierBalanceMs[%,d], totalMs[%,d].", + maxSegmentsToMove, tierCount[0], clusterShape.lhs, clusterShape.rhs, + tierBalanceTime.millisElapsed(), totalTime.millisElapsed() + ); + return params; } @@ -78,19 +103,18 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) * number of segments picked for moving is determined by the {@link TierSegmentBalancer} * based on the level of skew in the tier. */ - private int getMaxSegmentsToMove(DruidCoordinatorRuntimeParams params) + private int getMaxSegmentsToMove(DruidCoordinatorRuntimeParams params, Pair clusterShape) { final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig(); if (dynamicConfig.isSmartSegmentLoading()) { - final Pair numHistoricalsAndSegments = getNumHistoricalsAndSegments(params.getDruidCluster()); - final int totalSegmentsInCluster = numHistoricalsAndSegments.rhs; + final int totalSegmentsInCluster = clusterShape.rhs; final int numBalancerThreads = params.getSegmentLoadingConfig().getBalancerComputeThreads(); final int maxSegmentsToMove = SegmentToMoveCalculator .computeMaxSegmentsToMovePerTier(totalSegmentsInCluster, numBalancerThreads, coordinatorPeriod); log.debug( "Computed maxSegmentsToMove[%,d] for total [%,d] segments on [%d] historicals.", - maxSegmentsToMove, totalSegmentsInCluster, numHistoricalsAndSegments.lhs + maxSegmentsToMove, totalSegmentsInCluster, clusterShape.lhs ); return maxSegmentsToMove; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyGroup.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyGroup.java index 44b98061642b..109bcd1aeb23 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyGroup.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyGroup.java @@ -109,15 +109,29 @@ public void run(DruidCoordinatorRuntimeParams params) markRunStarted(); final CoordinatorRunStats stats = params.getCoordinatorStats(); + final StringBuilder cycleSummary = new StringBuilder(); + long cycleTotalMs = 0; for (CoordinatorDuty duty : duties) { if (coordinator.isLeader()) { final Stopwatch dutyRunTime = Stopwatch.createStarted(); params = duty.run(params); dutyRunTime.stop(); + final long elapsed = dutyRunTime.millisElapsed(); + cycleTotalMs += elapsed; final String dutyName = duty.getClass().getName(); + final String shortName = dutyName.substring(dutyName.lastIndexOf('.') + 1).replace('$', '.'); + if (cycleSummary.length() > 0) { + cycleSummary.append(", "); + } + cycleSummary.append(shortName).append('=').append(elapsed).append("ms"); + if (params == null) { log.warn("Stopping run for group[%s] on request of duty[%s].", name, dutyName); + log.info( + "DutyGroup[%s] cycle (aborted at duty[%s]) timings: total=%,dms; %s", + name, dutyName, cycleTotalMs, cycleSummary + ); return; } else { stats.add( @@ -129,6 +143,11 @@ public void run(DruidCoordinatorRuntimeParams params) } } + log.info( + "DutyGroup[%s] cycle timings: total=%,dms; %s", + name, cycleTotalMs, cycleSummary + ); + // Emit stats collected from all duties if (stats.rowCount() > 0) { stats.forEachStat(this::emitStat); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegments.java index 3f1a84703fe8..175be2310f72 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillStalePendingSegments.java @@ -23,6 +23,7 @@ import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; @@ -66,16 +67,28 @@ public KillStalePendingSegments(OverlordClient overlordClient) @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { + final Stopwatch totalTime = Stopwatch.createStarted(); + final Set killDatasources = new HashSet<>( params.getUsedSegmentsTimelinesPerDataSource().keySet() ); + final int candidates = killDatasources.size(); killDatasources.removeAll( params.getCoordinatorDynamicConfig() .getDataSourcesToNotKillStalePendingSegmentsIn() ); + final Stopwatch minCreatedTime0 = Stopwatch.createStarted(); final DateTime minCreatedTime = getMinCreatedTimeToRetain(); + minCreatedTime0.stop(); + + final Stopwatch killTime = Stopwatch.createStarted(); + long totalKilled = 0; + int datasourcesWithKills = 0; + long maxCallMs = 0; + String slowestDs = null; for (String dataSource : killDatasources) { + final Stopwatch perCall = Stopwatch.createStarted(); int pendingSegmentsKilled = FutureUtils.getUnchecked( overlordClient.killPendingSegments( dataSource, @@ -83,10 +96,17 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) ), true ); + final long elapsed = perCall.millisElapsed(); + if (elapsed > maxCallMs) { + maxCallMs = elapsed; + slowestDs = dataSource; + } if (pendingSegmentsKilled > 0) { + datasourcesWithKills++; + totalKilled += pendingSegmentsKilled; log.info( - "Killed [%d] pendingSegments created before [%s] for datasource[%s].", - pendingSegmentsKilled, minCreatedTime, dataSource + "Killed [%d] pendingSegments created before [%s] for datasource[%s] in [%,d]ms.", + pendingSegmentsKilled, minCreatedTime, dataSource, elapsed ); params.getCoordinatorStats().add( Stats.Kill.PENDING_SEGMENTS, @@ -95,6 +115,17 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) ); } } + killTime.stop(); + + log.info( + "KillStalePendingSegments summary: candidates[%d], kept[%d] (skipped via config), datasourcesWithKills[%d]," + + " totalKilled[%,d], slowestDs[%s] at [%,d]ms;" + + " minCreatedTimeMs[%,d], killLoopMs[%,d], totalMs[%,d].", + candidates, candidates - killDatasources.size(), datasourcesWithKills, + totalKilled, slowestDs, maxCallMs, + minCreatedTime0.millisElapsed(), killTime.millisElapsed(), totalTime.millisElapsed() + ); + return params; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java index 26b34a368458..f9adb0f11039 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java @@ -27,6 +27,7 @@ import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; @@ -155,21 +156,30 @@ public DruidCoordinatorRuntimeParams run(final DruidCoordinatorRuntimeParams par private DruidCoordinatorRuntimeParams runInternal(final DruidCoordinatorRuntimeParams params) { + final Stopwatch totalTime = Stopwatch.createStarted(); final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig(); final CoordinatorRunStats stats = params.getCoordinatorStats(); + final Stopwatch slotsTime = Stopwatch.createStarted(); final int availableKillTaskSlots = getAvailableKillTaskSlots(dynamicConfig, stats); + slotsTime.stop(); if (availableKillTaskSlots <= 0) { - log.debug("Skipping KillUnusedSegments because there are no available kill task slots."); + log.info( + "KillUnusedSegments skipped: no available kill task slots." + + " slotsLookupMs[%,d], totalMs[%,d].", + slotsTime.millisElapsed(), totalTime.millisElapsed() + ); return params; } + final Stopwatch dsListTime = Stopwatch.createStarted(); final Set dataSourcesToKill; if (CollectionUtils.isNullOrEmpty(dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn())) { dataSourcesToKill = storageCoordinator.retrieveAllDatasourceNames(); } else { dataSourcesToKill = dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn(); } + dsListTime.stop(); if (datasourceCircularKillList == null || !datasourceCircularKillList.equalsSet(dataSourcesToKill)) { @@ -178,11 +188,21 @@ private DruidCoordinatorRuntimeParams runInternal(final DruidCoordinatorRuntimeP lastKillTime = DateTimes.nowUtc(); + final Stopwatch killLoopTime = Stopwatch.createStarted(); killUnusedSegments(dataSourcesToKill, availableKillTaskSlots, stats); + killLoopTime.stop(); // any datasources that are no longer being considered for kill should have their // last kill interval removed from map. datasourceToLastKillIntervalEnd.keySet().retainAll(dataSourcesToKill); + + log.info( + "KillUnusedSegments summary: availableSlots[%d], datasourcesToKill[%d];" + + " slotsLookupMs[%,d], dsListMs[%,d], killLoopMs[%,d], totalMs[%,d].", + availableKillTaskSlots, dataSourcesToKill.size(), + slotsTime.millisElapsed(), dsListTime.millisElapsed(), + killLoopTime.millisElapsed(), totalTime.millisElapsed() + ); return params; } @@ -204,6 +224,12 @@ private void killUnusedSegments( final Set remainingDatasourcesToKill = new HashSet<>(dataSourcesToKill); int submittedTasks = 0; + long findIntervalTotalMs = 0; + long submitTotalMs = 0; + long maxFindIntervalMs = 0; + String slowestFindDs = null; + int dsScanned = 0; + int dsWithNoInterval = 0; for (String dataSource : datasourceCircularKillList) { if (dataSource.equals(prevDatasourceKilled) && remainingDatasourcesToKill.size() > 1) { // Skip this dataSource if it's the same as the previous one and there are remaining datasources to kill. @@ -213,9 +239,18 @@ private void killUnusedSegments( remainingDatasourcesToKill.remove(dataSource); } + dsScanned++; final DateTime maxUsedStatusLastUpdatedTime = DateTimes.nowUtc().minus(bufferPeriod); + final Stopwatch findIntervalSw = Stopwatch.createStarted(); final Interval intervalToKill = findIntervalForKill(dataSource, maxUsedStatusLastUpdatedTime, stats); + final long findMs = findIntervalSw.millisElapsed(); + findIntervalTotalMs += findMs; + if (findMs > maxFindIntervalMs) { + maxFindIntervalMs = findMs; + slowestFindDs = dataSource; + } if (intervalToKill == null) { + dsWithNoInterval++; datasourceToLastKillIntervalEnd.remove(dataSource); // If no interval is found for this datasource, either terminate or continue based on remaining datasources to kill. if (remainingDatasourcesToKill.isEmpty()) { @@ -225,6 +260,7 @@ private void killUnusedSegments( } try { + final Stopwatch submitSw = Stopwatch.createStarted(); FutureUtils.getUnchecked( overlordClient.runKillTask( TASK_ID_PREFIX, @@ -236,6 +272,7 @@ private void killUnusedSegments( ), true ); + submitTotalMs += submitSw.millisElapsed(); ++submittedTasks; datasourceToLastKillIntervalEnd.put(dataSource, intervalToKill.getEnd()); @@ -261,6 +298,12 @@ private void killUnusedSegments( remainingDatasourcesToKill.size(), remainingDatasourcesToKill ); + log.info( + "KillUnusedSegments killLoop: dsScanned[%d], dsWithNoInterval[%d], submitted[%d];" + + " findIntervalTotalMs[%,d], submitTotalMs[%,d], slowestFindIntervalDs[%s] at [%,d]ms.", + dsScanned, dsWithNoInterval, submittedTasks, + findIntervalTotalMs, submitTotalMs, slowestFindDs, maxFindIntervalMs + ); stats.add(Stats.Kill.SUBMITTED_TASKS, submittedTasks); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkEternityTombstonesAsUnused.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkEternityTombstonesAsUnused.java index f332f11020c5..042a3ffacea8 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkEternityTombstonesAsUnused.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkEternityTombstonesAsUnused.java @@ -19,10 +19,10 @@ package org.apache.druid.server.coordinator.duty; -import com.google.common.base.Optional; import org.apache.druid.client.DataSourcesSnapshot; +import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; @@ -31,7 +31,6 @@ import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.Partitions; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.partition.TombstoneShardSpec; @@ -82,15 +81,24 @@ public MarkEternityTombstonesAsUnused(final MetadataAction.DeleteSegments delete @Override public DruidCoordinatorRuntimeParams run(final DruidCoordinatorRuntimeParams params) { - DataSourcesSnapshot dataSourcesSnapshot = params.getDataSourcesSnapshot(); + final Stopwatch totalTime = Stopwatch.createStarted(); + final DataSourcesSnapshot dataSourcesSnapshot = params.getDataSourcesSnapshot(); - final Map> datasourceToNonOvershadowedEternityTombstones = + final Stopwatch determineTime = Stopwatch.createStarted(); + final Map> datasourceToNonOvershadowedEternityTombstones = determineNonOvershadowedEternityTombstones( dataSourcesSnapshot ); + determineTime.stop(); if (datasourceToNonOvershadowedEternityTombstones.isEmpty()) { - log.debug("No non-overshadowed eternity tombstones found."); + log.info( + "MarkEternityTombstonesAsUnused: no non-overshadowed eternity tombstones found." + + " datasources[%d], overshadowedSegments[%,d], determineMs[%,d], totalMs[%,d].", + dataSourcesSnapshot.getDataSourcesMap().size(), + dataSourcesSnapshot.getOvershadowedSegments().size(), + determineTime.millisElapsed(), totalTime.millisElapsed() + ); return params; } @@ -98,11 +106,16 @@ public DruidCoordinatorRuntimeParams run(final DruidCoordinatorRuntimeParams par datasourceToNonOvershadowedEternityTombstones.size(), datasourceToNonOvershadowedEternityTombstones ); + final Stopwatch dbUpdateTime = Stopwatch.createStarted(); final CoordinatorRunStats stats = params.getCoordinatorStats(); + final int[] totalMarked = {0}; + final int[] candidates = {0}; datasourceToNonOvershadowedEternityTombstones.forEach((datasource, nonOvershadowedEternityTombstones) -> { + candidates[0] += nonOvershadowedEternityTombstones.size(); final RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, datasource); stats.add(Stats.Segments.UNNEEDED_ETERNITY_TOMBSTONE, datasourceKey, nonOvershadowedEternityTombstones.size()); final int unusedCount = deleteHandler.markSegmentsAsUnused(datasource, nonOvershadowedEternityTombstones); + totalMarked[0] += unusedCount; log.info( "Successfully marked [%d] non-overshadowed eternity tombstones[%s] of datasource[%s] as unused.", unusedCount, @@ -110,6 +123,16 @@ public DruidCoordinatorRuntimeParams run(final DruidCoordinatorRuntimeParams par datasource ); }); + dbUpdateTime.stop(); + + log.info( + "MarkEternityTombstonesAsUnused summary: datasources[%d], overshadowedSegments[%,d]," + + " candidates[%d], marked[%d]; determineMs[%,d], dbUpdateMs[%,d], totalMs[%,d].", + dataSourcesSnapshot.getDataSourcesMap().size(), + dataSourcesSnapshot.getOvershadowedSegments().size(), + candidates[0], totalMarked[0], + determineTime.millisElapsed(), dbUpdateTime.millisElapsed(), totalTime.millisElapsed() + ); return params; } @@ -132,43 +155,57 @@ public DruidCoordinatorRuntimeParams run(final DruidCoordinatorRuntimeParams par private Map> determineNonOvershadowedEternityTombstones(final DataSourcesSnapshot dataSourcesSnapshot) { final Map> datasourceToNonOvershadowedEternityTombstones = new HashMap<>(); + final Map> overshadowedSegmentsByDatasource = new HashMap<>(); + + for (final DataSegment overshadowedSegment : dataSourcesSnapshot.getOvershadowedSegments()) { + overshadowedSegmentsByDatasource + .computeIfAbsent(overshadowedSegment.getDataSource(), ds -> new HashSet<>()) + .add(overshadowedSegment); + } - dataSourcesSnapshot.getDataSourcesMap().keySet().forEach((datasource) -> { + for (final ImmutableDruidDataSource dataSource : dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()) { + final String datasource = dataSource.getName(); final SegmentTimeline usedSegmentsTimeline = dataSourcesSnapshot.getUsedSegmentsTimelinesPerDataSource().get(datasource); + if (usedSegmentsTimeline == null) { + continue; + } - final Optional> usedNonOvershadowedSegments = - Optional.fromNullable(usedSegmentsTimeline) - .transform(timeline -> timeline.findNonOvershadowedObjectsInInterval( - Intervals.ETERNITY, - Partitions.ONLY_COMPLETE - )); - - if (usedNonOvershadowedSegments.isPresent()) { - usedNonOvershadowedSegments.get().forEach(candidateSegment -> { - if (isNewGenerationEternityTombstone(candidateSegment)) { - boolean overlaps = dataSourcesSnapshot.getOvershadowedSegments().stream() - .filter(overshadowedSegment -> - candidateSegment.getDataSource() - .equals(overshadowedSegment.getDataSource())) - .anyMatch( - overshadowedSegment -> - candidateSegment.getInterval() - .overlaps(overshadowedSegment.getInterval()) - ); - if (!overlaps) { - datasourceToNonOvershadowedEternityTombstones - .computeIfAbsent(datasource, ds -> new HashSet<>()) - .add(candidateSegment.getId()); - } - } - }); + for (final DataSegment candidateSegment : dataSource.getSegments()) { + if (isNewGenerationEternityTombstone(candidateSegment) + && !usedSegmentsTimeline.isOvershadowed(candidateSegment) + && !overlapsAnyOvershadowedSegment( + candidateSegment, + overshadowedSegmentsByDatasource.get(candidateSegment.getDataSource()) + )) { + datasourceToNonOvershadowedEternityTombstones + .computeIfAbsent(datasource, ds -> new HashSet<>()) + .add(candidateSegment.getId()); + } } - }); + } return datasourceToNonOvershadowedEternityTombstones; } + private boolean overlapsAnyOvershadowedSegment( + final DataSegment candidateSegment, + final Set overshadowedSegments + ) + { + if (overshadowedSegments == null) { + return false; + } + + for (final DataSegment overshadowedSegment : overshadowedSegments) { + if (candidateSegment.getInterval().overlaps(overshadowedSegment.getInterval())) { + return true; + } + } + + return false; + } + private boolean isNewGenerationEternityTombstone(final DataSegment segment) { return segment.isTombstone() && segment.getShardSpec().getNumCorePartitions() == 0 && ( diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java index dba11d2c2901..38e0ea6133dd 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java @@ -66,53 +66,83 @@ public MarkOvershadowedSegmentsAsUnused(MetadataAction.DeleteSegments deleteHand @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { + final Stopwatch totalTime = Stopwatch.createStarted(); + // Mark overshadowed segments as unused only if the coordinator has been running // long enough to have refreshed its metadata view final Duration requiredDelay = Duration.millis( params.getCoordinatorDynamicConfig().getMarkSegmentAsUnusedDelayMillis() ); if (sinceCoordinatorStarted.hasNotElapsed(requiredDelay)) { + log.info( + "Skipping MarkOvershadowedSegmentsAsUnused; required delay[%s] not yet elapsed since coordinator start.", + requiredDelay + ); return params; } final Set allOvershadowedSegments = params.getDataSourcesSnapshot().getOvershadowedSegments(); if (allOvershadowedSegments.isEmpty()) { + log.info("No overshadowed segments found in metadata snapshot. Total time[%,d]ms.", totalTime.millisElapsed()); return params; } + // Identify the datasources that actually have overshadowed segments to check. + // Timelines only need to be built for these datasources. + final Set relevantDatasources = new HashSet<>(); + for (DataSegment s : allOvershadowedSegments) { + relevantDatasources.add(s.getDataSource()); + } + final DruidCluster cluster = params.getDruidCluster(); + final int totalDatasources = params.getDataSourcesSnapshot().getDataSourcesMap().size(); + + final Stopwatch timelineBuildTime = Stopwatch.createStarted(); final Map timelines = new HashMap<>(); + final long[] servedSegmentsCounter = {0}; cluster.getManagedHistoricals().values().forEach( historicals -> historicals.forEach( - historical -> addSegmentsFromServer(historical, timelines) + historical -> servedSegmentsCounter[0] += addSegmentsFromServer(historical, timelines, relevantDatasources) ) ); cluster.getBrokers().forEach( - broker -> addSegmentsFromServer(broker, timelines) + broker -> servedSegmentsCounter[0] += addSegmentsFromServer(broker, timelines, relevantDatasources) ); // Include all segments that require zero replicas to be loaded + final int[] zeroReplicaSegments = {0}; params.getSegmentAssigner().getSegmentsWithZeroRequiredReplicas().forEach( - (datasource, segments) -> timelines - .computeIfAbsent(datasource, ds -> new SegmentTimeline()) - .addSegments(segments.iterator()) + (datasource, segments) -> { + if (relevantDatasources.contains(datasource)) { + zeroReplicaSegments[0] += segments.size(); + timelines.computeIfAbsent(datasource, ds -> new SegmentTimeline()) + .addSegments(segments.iterator()); + } + } ); + timelineBuildTime.stop(); // Do not include segments served by ingestion services such as tasks or indexers, // to prevent unpublished segments from prematurely overshadowing segments. // Mark all segments overshadowed by served segments as unused + final Stopwatch checkTime = Stopwatch.createStarted(); final Map> datasourceToUnusedSegments = new HashMap<>(); + int verifiedOvershadowed = 0; for (DataSegment dataSegment : allOvershadowedSegments) { SegmentTimeline timeline = timelines.get(dataSegment.getDataSource()); if (timeline != null && timeline.isOvershadowed(dataSegment)) { datasourceToUnusedSegments.computeIfAbsent(dataSegment.getDataSource(), ds -> new HashSet<>()) .add(dataSegment.getId()); + verifiedOvershadowed++; } } + checkTime.stop(); + final Stopwatch dbUpdateTime = Stopwatch.createStarted(); final CoordinatorRunStats stats = params.getCoordinatorStats(); + final int[] totalMarked = {0}; datasourceToUnusedSegments.forEach( (datasource, unusedSegments) -> { RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, datasource); @@ -120,27 +150,48 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) final Stopwatch updateTime = Stopwatch.createStarted(); int updatedCount = deleteHandler.markSegmentsAsUnused(datasource, unusedSegments); + totalMarked[0] += updatedCount; log.info( "Marked [%d] segments of datasource[%s] as unused in [%,d]ms.", updatedCount, datasource, updateTime.millisElapsed() ); } ); + dbUpdateTime.stop(); + + log.info( + "MarkOvershadowedSegmentsAsUnused summary: overshadowedInMetadata[%,d], relevantDatasources[%d/%d]," + + " servedSegmentsAdded[%,d], zeroReplicaSegmentsAdded[%,d], verifiedOvershadowed[%,d], marked[%,d];" + + " timelineBuildMs[%,d], checkMs[%,d], dbUpdateMs[%,d], totalMs[%,d].", + allOvershadowedSegments.size(), relevantDatasources.size(), totalDatasources, + servedSegmentsCounter[0], zeroReplicaSegments[0], verifiedOvershadowed, totalMarked[0], + timelineBuildTime.millisElapsed(), checkTime.millisElapsed(), dbUpdateTime.millisElapsed(), + totalTime.millisElapsed() + ); return params; } - private void addSegmentsFromServer( + private long addSegmentsFromServer( ServerHolder serverHolder, - Map timelines + Map timelines, + Set relevantDatasources ) { - ImmutableDruidServer server = serverHolder.getServer(); + final ImmutableDruidServer server = serverHolder.getServer(); + long added = 0; + + for (final ImmutableDruidDataSource dataSource : server.getDataSources()) { + if (!relevantDatasources.contains(dataSource.getName())) { + continue; + } - for (ImmutableDruidDataSource dataSource : server.getDataSources()) { + final int n = dataSource.getSegments().size(); + added += n; timelines .computeIfAbsent(dataSource.getName(), dsName -> new SegmentTimeline()) .addSegments(dataSource.getSegments().iterator()); } + return added; } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java index ec0ddcaf0d09..8870450fb016 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java @@ -20,8 +20,10 @@ package org.apache.druid.server.coordinator.duty; import org.apache.druid.client.DruidServer; +import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.ServerInventoryView; +import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.coordinator.DruidCluster; @@ -36,7 +38,6 @@ import org.apache.druid.server.coordinator.stats.Dimension; import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.coordinator.stats.Stats; -import org.apache.druid.timeline.DataSegment; import java.util.HashSet; import java.util.List; @@ -82,19 +83,34 @@ public PrepareBalancerAndLoadQueues( @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { + final Stopwatch totalTime = Stopwatch.createStarted(); + + final Stopwatch prepareServersTime = Stopwatch.createStarted(); List currentServers = prepareCurrentServers(); + prepareServersTime.stop(); taskMaster.resetPeonsForNewServers(currentServers); final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig(); final SegmentLoadingConfig segmentLoadingConfig = SegmentLoadingConfig.create(dynamicConfig, params.getUsedSegmentCount()); + final Stopwatch prepareClusterTime = Stopwatch.createStarted(); final DruidCluster cluster = prepareCluster(dynamicConfig, segmentLoadingConfig, currentServers); - cancelLoadsOnDecommissioningServers(cluster); + prepareClusterTime.stop(); + + final Stopwatch cancelDecommTime = Stopwatch.createStarted(); + final int cancelledLoads = cancelLoadsOnDecommissioningServers(cluster); + cancelDecommTime.stop(); final CoordinatorRunStats stats = params.getCoordinatorStats(); + final Stopwatch histStatsTime = Stopwatch.createStarted(); collectHistoricalStats(cluster, stats, dynamicConfig.getTierToAliasName()); - collectUsedSegmentStats(params, stats); + histStatsTime.stop(); + + final Stopwatch usedSegStatsTime = Stopwatch.createStarted(); + final long[] usedSegStatsCounts = collectUsedSegmentStats(params, stats); + usedSegStatsTime.stop(); + collectDebugStats(segmentLoadingConfig, stats); final int numBalancerThreads = segmentLoadingConfig.getBalancerComputeThreads(); @@ -104,6 +120,18 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) balancerStrategy.getClass().getSimpleName(), numBalancerThreads ); + log.info( + "PrepareBalancerAndLoadQueues summary: servers[%d], usedSegmentCount[%,d], dsScanned[%,d]," + + " totalSegmentsInTimelines[%,d], totalUsedBytes[%,d], cancelledLoadsOnDecomm[%d];" + + " prepareServersMs[%,d], prepareClusterMs[%,d], cancelDecommMs[%,d], histStatsMs[%,d]," + + " usedSegStatsMs[%,d], totalMs[%,d].", + currentServers.size(), params.getUsedSegmentCount(), usedSegStatsCounts[0], + usedSegStatsCounts[1], usedSegStatsCounts[2], cancelledLoads, + prepareServersTime.millisElapsed(), prepareClusterTime.millisElapsed(), + cancelDecommTime.millisElapsed(), histStatsTime.millisElapsed(), + usedSegStatsTime.millisElapsed(), totalTime.millisElapsed() + ); + return params.buildFromExisting() .withDruidCluster(cluster) .withBalancerStrategy(balancerStrategy) @@ -117,7 +145,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) * be done before initializing the SegmentReplicantLookup so that * under-replicated segments can be assigned in the current run itself. */ - private void cancelLoadsOnDecommissioningServers(DruidCluster cluster) + private int cancelLoadsOnDecommissioningServers(DruidCluster cluster) { final AtomicInteger cancelledCount = new AtomicInteger(0); final List decommissioningServers @@ -135,6 +163,7 @@ private void cancelLoadsOnDecommissioningServers(DruidCluster cluster) } ); } + return cancelledCount.get(); } private List prepareCurrentServers() @@ -201,16 +230,25 @@ private void collectHistoricalStats( }); } - private void collectUsedSegmentStats(DruidCoordinatorRuntimeParams params, CoordinatorRunStats stats) + /** + * @return {dsCount, totalSegments, totalBytes} + */ + private long[] collectUsedSegmentStats(DruidCoordinatorRuntimeParams params, CoordinatorRunStats stats) { - params.getUsedSegmentsTimelinesPerDataSource().forEach((dataSource, timeline) -> { - long totalSizeOfUsedSegments = timeline.iterateAllObjects().stream() - .mapToLong(DataSegment::getSize).sum(); + final long[] counts = new long[3]; + for (final ImmutableDruidDataSource dataSource : params.getDataSourcesSnapshot().getDataSourcesWithAllUsedSegments()) { + final long totalSizeOfUsedSegments = dataSource.getTotalSizeOfSegments(); + final int numUsedSegments = dataSource.getSegments().size(); - RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, dataSource); + final RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, dataSource.getName()); stats.add(Stats.Segments.USED_BYTES, datasourceKey, totalSizeOfUsedSegments); - stats.add(Stats.Segments.USED, datasourceKey, timeline.getNumObjects()); - }); + stats.add(Stats.Segments.USED, datasourceKey, numUsedSegments); + + counts[0] += 1; + counts[1] += numUsedSegments; + counts[2] += totalSizeOfUsedSegments; + } + return counts; } private void collectDebugStats(SegmentLoadingConfig config, CoordinatorRunStats stats) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java index 88ea485f6797..9bdf132ed0c3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java @@ -41,6 +41,7 @@ import org.joda.time.Interval; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -76,6 +77,8 @@ public RunRules( @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { + final Stopwatch totalTime = Stopwatch.createStarted(); + final DruidCluster cluster = params.getDruidCluster(); if (cluster.isEmpty()) { log.warn("Cluster has no servers. Not running any rules."); @@ -98,6 +101,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) final DateTime now = DateTimes.nowUtc(); final Object2IntOpenHashMap datasourceToSegmentsWithNoRule = new Object2IntOpenHashMap<>(); + final Map> datasourceToRules = new HashMap<>(); // Streaming shard-group boundary state. SegmentHolder.NEWEST_SEGMENT_FIRST groups segments contiguously by // (dataSource, interval, version), so on any change in that triple we flush the buffer for the previous group. @@ -105,10 +109,15 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) Interval currentInterval = null; String currentVersion = null; - for (DataSegment segment : usedSegments) { + final Stopwatch ruleEvalTime = Stopwatch.createStarted(); + long overshadowedSkipped = 0; + long evaluated = 0; + long noRuleMatch = 0; + for (final DataSegment segment : usedSegments) { // Do not apply rules on overshadowed segments as they will be // marked unused and eventually unloaded from all historicals if (overshadowed.contains(segment)) { + overshadowedSkipped++; continue; } @@ -123,9 +132,13 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) } // Find and apply matching rule - List rules = ruleHandler.getRulesWithDefault(segment.getDataSource()); + final List rules = datasourceToRules.computeIfAbsent( + segment.getDataSource(), + ruleHandler::getRulesWithDefault + ); + evaluated++; boolean foundMatchingRule = false; - for (Rule rule : rules) { + for (final Rule rule : rules) { if (rule.appliesTo(segment, now)) { rule.run(segment, segmentHandler); foundMatchingRule = true; @@ -135,18 +148,38 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) if (!foundMatchingRule) { datasourceToSegmentsWithNoRule.addTo(segment.getDataSource(), 1); + noRuleMatch++; } } + ruleEvalTime.stop(); // Tail flush for the last shard group. segmentHandler.flushAndReset(); + final Stopwatch deleteTime = Stopwatch.createStarted(); + final int deleteDatasources = segmentAssigner.getSegmentsToDelete().size(); processSegmentDeletes(segmentAssigner, params.getCoordinatorStats()); + deleteTime.stop(); + alertForSegmentsWithNoRules(datasourceToSegmentsWithNoRule); alertForInvalidRules(segmentAssigner); + final Stopwatch broadcastTime = Stopwatch.createStarted(); + final Set broadcastDatasources = getBroadcastDatasources(params, datasourceToRules); + broadcastTime.stop(); + + log.info( + "RunRules summary: usedSegments[%,d], overshadowedSkipped[%,d], evaluated[%,d], noRuleMatch[%,d]," + + " ruleLookupCalls[%,d], deleteDatasources[%d], broadcastDatasources[%d];" + + " ruleEvalMs[%,d], deleteMs[%,d], broadcastScanMs[%,d], totalMs[%,d].", + usedSegments.size(), overshadowedSkipped, evaluated, noRuleMatch, datasourceToRules.size(), + deleteDatasources, broadcastDatasources.size(), + ruleEvalTime.millisElapsed(), deleteTime.millisElapsed(), + broadcastTime.millisElapsed(), totalTime.millisElapsed() + ); + return params.buildFromExisting() - .withBroadcastDatasources(getBroadcastDatasources(params)) + .withBroadcastDatasources(broadcastDatasources) .build(); } @@ -190,11 +223,14 @@ private void alertForInvalidRules(StrategicSegmentAssigner segmentAssigner) ); } - private Set getBroadcastDatasources(DruidCoordinatorRuntimeParams params) + private Set getBroadcastDatasources( + DruidCoordinatorRuntimeParams params, + Map> datasourceToRules + ) { return params.getDataSourcesSnapshot().getDataSourcesMap().values().stream() .map(ImmutableDruidDataSource::getName) - .filter(this::isBroadcastDatasource) + .filter(datasource -> isBroadcastDatasource(datasource, datasourceToRules)) .collect(Collectors.toSet()); } @@ -206,9 +242,9 @@ private Set getBroadcastDatasources(DruidCoordinatorRuntimeParams params *
  • Are unloaded if unused, even from realtime servers
  • * */ - private boolean isBroadcastDatasource(String datasource) + private boolean isBroadcastDatasource(String datasource, Map> datasourceToRules) { - return ruleHandler.getRulesWithDefault(datasource) + return datasourceToRules.computeIfAbsent(datasource, ruleHandler::getRulesWithDefault) .stream() .anyMatch(rule -> rule instanceof BroadcastDistributionRule); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java index 761e3383ede1..1ec4be8708df 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java @@ -21,6 +21,7 @@ import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; +import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.ServerHolder; @@ -57,24 +58,35 @@ public UnloadUnusedSegments( @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { + final Stopwatch totalTime = Stopwatch.createStarted(); + final Map broadcastStatusByDatasource = new HashMap<>(); for (String broadcastDatasource : params.getBroadcastDatasources()) { broadcastStatusByDatasource.put(broadcastDatasource, true); } final List allServers = params.getDruidCluster().getAllManagedServers(); + + final Stopwatch cancelTime = Stopwatch.createStarted(); int numCancelledLoads = allServers.stream().mapToInt( server -> cancelLoadOfUnusedSegments(server, broadcastStatusByDatasource, params) ).sum(); + cancelTime.stop(); final CoordinatorRunStats stats = params.getCoordinatorStats(); + final Stopwatch dropTime = Stopwatch.createStarted(); int numQueuedDrops = allServers.stream().mapToInt( server -> dropUnusedSegments(server, params, stats, broadcastStatusByDatasource) ).sum(); - - if (numCancelledLoads > 0 || numQueuedDrops > 0) { - log.debug("Cancelled [%d] loads and started [%d] drops of unused segments.", numCancelledLoads, numQueuedDrops); - } + dropTime.stop(); + + log.info( + "UnloadUnusedSegments summary: servers[%d], broadcastDatasources[%d]," + + " cancelledLoads[%d], queuedDrops[%d]; cancelMs[%,d], dropMs[%,d], totalMs[%,d].", + allServers.size(), params.getBroadcastDatasources().size(), + numCancelledLoads, numQueuedDrops, + cancelTime.millisElapsed(), dropTime.millisElapsed(), totalTime.millisElapsed() + ); return params; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicationStatus.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicationStatus.java index 7121642f25ed..ff57019fdb11 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicationStatus.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicationStatus.java @@ -19,7 +19,9 @@ package org.apache.druid.server.coordinator.loading; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import it.unimi.dsi.fastutil.objects.Object2LongMap; import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.druid.timeline.DataSegment; @@ -40,15 +42,18 @@ public class SegmentReplicationStatus public SegmentReplicationStatus(Map> replicaCountsInTier) { - this.replicaCountsInTier = ImmutableMap.copyOf(replicaCountsInTier); + // replicaCountsInTier is owned by the caller's SegmentReplicaCountMap, rebuilt fresh each + // coordinator cycle and not mutated again once this constructor runs, so a defensive deep + // copy is unnecessary; hold the reference directly and compute totals in the same pass. + this.replicaCountsInTier = replicaCountsInTier; - final Map totalReplicaCounts = new HashMap<>(); - replicaCountsInTier.forEach((segmentId, tierToReplicaCount) -> { + final Map totalReplicaCounts = Maps.newHashMapWithExpectedSize(replicaCountsInTier.size()); + for (Map.Entry> entry : replicaCountsInTier.entrySet()) { final SegmentReplicaCount total = new SegmentReplicaCount(); - tierToReplicaCount.values().forEach(total::accumulate); - totalReplicaCounts.put(segmentId, total); - }); - this.totalReplicaCounts = ImmutableMap.copyOf(totalReplicaCounts); + entry.getValue().values().forEach(total::accumulate); + totalReplicaCounts.put(entry.getKey(), total); + } + this.totalReplicaCounts = totalReplicaCounts; } public SegmentReplicaCount getReplicaCountsInCluster(SegmentId segmentId) @@ -81,4 +86,84 @@ public Map> getTierToDatasourceToUnderReplicated( return tierToUnderReplicated; } + + /** + * Computes unavailable, under-replicated and deep-storage-only segment counts in a single + * pass over {@code usedSegments}, instead of three independent full iterations. Produces + * results identical to calling {@link #getReplicaCountsInCluster} and + * {@link #getTierToDatasourceToUnderReplicated} independently for each segment. + * + * @param ignoreMissingServers same semantics as in {@link #getTierToDatasourceToUnderReplicated}. + */ + public SegmentStatsSnapshot computeSegmentStats(Iterable usedSegments, boolean ignoreMissingServers) + { + final Object2IntOpenHashMap datasourceToUnavailable = new Object2IntOpenHashMap<>(); + final Object2IntOpenHashMap datasourceToDeepStorageOnly = new Object2IntOpenHashMap<>(); + final Map> tierToUnderReplicated = new HashMap<>(); + + for (DataSegment segment : usedSegments) { + final SegmentId segmentId = segment.getId(); + final String datasource = segment.getDataSource(); + + final SegmentReplicaCount totalCount = totalReplicaCounts.get(segmentId); + if (totalCount != null && (totalCount.totalLoaded() > 0 || totalCount.required() == 0)) { + datasourceToUnavailable.addTo(datasource, 0); + } else { + datasourceToUnavailable.addTo(datasource, 1); + } + if (totalCount != null && totalCount.totalLoaded() == 0 && totalCount.required() == 0) { + datasourceToDeepStorageOnly.addTo(datasource, 1); + } + + final Map tierToReplicaCount = replicaCountsInTier.get(segmentId); + if (tierToReplicaCount != null) { + tierToReplicaCount.forEach((tier, counts) -> { + final int underReplicated = ignoreMissingServers ? counts.missing() : counts.missingAndLoadable(); + if (underReplicated >= 0) { + Object2LongOpenHashMap datasourceToUnderReplicated = (Object2LongOpenHashMap) + tierToUnderReplicated.computeIfAbsent(tier, ds -> new Object2LongOpenHashMap<>()); + datasourceToUnderReplicated.addTo(datasource, underReplicated); + } + }); + } + } + + return new SegmentStatsSnapshot(datasourceToUnavailable, tierToUnderReplicated, datasourceToDeepStorageOnly); + } + + /** + * Holder for the three segment-stat views computed together by {@link #computeSegmentStats}. + */ + public static class SegmentStatsSnapshot + { + private final Object2IntMap datasourceToUnavailableCount; + private final Map> tierToDatasourceToUnderReplicatedCount; + private final Object2IntMap datasourceToDeepStorageOnlyCount; + + SegmentStatsSnapshot( + Object2IntMap datasourceToUnavailableCount, + Map> tierToDatasourceToUnderReplicatedCount, + Object2IntMap datasourceToDeepStorageOnlyCount + ) + { + this.datasourceToUnavailableCount = datasourceToUnavailableCount; + this.tierToDatasourceToUnderReplicatedCount = tierToDatasourceToUnderReplicatedCount; + this.datasourceToDeepStorageOnlyCount = datasourceToDeepStorageOnlyCount; + } + + public Object2IntMap getDatasourceToUnavailableCount() + { + return datasourceToUnavailableCount; + } + + public Map> getTierToDatasourceToUnderReplicatedCount() + { + return tierToDatasourceToUnderReplicatedCount; + } + + public Object2IntMap getDatasourceToDeepStorageOnlyCount() + { + return datasourceToDeepStorageOnlyCount; + } + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java index fc6936e4df19..45c00fa2a7b2 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java @@ -19,7 +19,6 @@ package org.apache.druid.server.coordinator.loading; -import com.google.common.collect.Sets; import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import org.apache.druid.client.DruidServer; import org.apache.druid.server.coordinator.DruidCluster; @@ -64,6 +63,10 @@ public class StrategicSegmentAssigner implements SegmentActionHandler private final RoundRobinServerSelector serverSelector; private final BalancerStrategy strategy; + // Fixed for the lifetime of this assigner (a new instance is created each coordinator run), + // so it is computed once here rather than on every replicateSegment/replicateSegmentPartially call. + private final Set allTiersInCluster; + private final boolean useRoundRobinAssignment; private final Map> historicalTierAliases; private final Map tierToAliasName; @@ -86,6 +89,9 @@ public StrategicSegmentAssigner( this.cluster = cluster; this.strategy = strategy; this.loadQueueManager = loadQueueManager; + final Set allTiersInCluster = new HashSet<>(); + cluster.getTierNames().forEach(allTiersInCluster::add); + this.allTiersInCluster = allTiersInCluster; this.replicaCountMap = SegmentReplicaCountMap.create(cluster); this.replicationThrottler = createReplicationThrottler(cluster, loadingConfig); this.useRoundRobinAssignment = loadingConfig.isUseRoundRobinSegmentAssignment(); @@ -234,7 +240,6 @@ private Map expandWithAliases(Map tierToReplic public void replicateSegment(DataSegment segment, Map tierToReplicaCount) { final Map effectiveTierToReplicaCount = expandWithAliases(tierToReplicaCount); - final Set allTiersInCluster = Sets.newHashSet(cluster.getTierNames()); if (effectiveTierToReplicaCount.isEmpty()) { // Track the counts for a segment even if it requires 0 replicas on all tiers @@ -290,7 +295,6 @@ public void replicateSegmentPartially( ) { final Map effectiveTierToReplicaCount = expandWithAliases(tierToReplicaCount); - final Set allTiersInCluster = Sets.newHashSet(cluster.getTierNames()); if (effectiveTierToReplicaCount.isEmpty()) { replicaCountMap.computeIfAbsent(segment.getId(), DruidServer.DEFAULT_TIER); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/SegmentReplicationStatusTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/SegmentReplicationStatusTest.java new file mode 100644 index 000000000000..f9760bb2c613 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/SegmentReplicationStatusTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.loading; + +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2LongMap; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.server.coordinator.CreateDataSegments; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Verifies that {@link SegmentReplicationStatus#computeSegmentStats} produces results + * identical to computing unavailable, under-replicated and deep-storage-only counts + * independently via {@link SegmentReplicationStatus#getReplicaCountsInCluster} and + * {@link SegmentReplicationStatus#getTierToDatasourceToUnderReplicated}. + */ +public class SegmentReplicationStatusTest +{ + private static final String TIER_1 = "tier1"; + private static final String TIER_2 = "tier2"; + + private final List segments = CreateDataSegments + .ofDatasource("wiki") + .forIntervals(5, Granularities.DAY) + .eachOfSize(100); + + @Test + public void testComputeSegmentStatsMatchesIndependentComputation() + { + final Map> replicaCountsInTier = new HashMap<>(); + + // Segment 0: fully loaded, single tier, no under-replication. + replicaCountsInTier.put(segments.get(0).getId(), Map.of(TIER_1, countOf(2, 2, 2))); + + // Segment 1: unavailable (required > 0, nothing loaded). + replicaCountsInTier.put(segments.get(1).getId(), Map.of(TIER_1, countOf(2, 2, 0))); + + // Segment 2: deep-storage-only (required == 0, nothing loaded). + replicaCountsInTier.put(segments.get(2).getId(), Map.of(TIER_1, countOf(0, 0, 0))); + + // Segment 3: under-replicated in one tier, satisfied in another. + final Map tierMap3 = new HashMap<>(); + tierMap3.put(TIER_1, countOf(2, 2, 1)); + tierMap3.put(TIER_2, countOf(1, 1, 1)); + replicaCountsInTier.put(segments.get(3).getId(), tierMap3); + + // Segment 4: present in the used-segment list but absent from the replication map + // (e.g. metadata race) -- must be treated as unavailable, not throw. + // Intentionally not added to replicaCountsInTier. + + final SegmentReplicationStatus status = new SegmentReplicationStatus(replicaCountsInTier); + + for (boolean ignoreMissingServers : new boolean[]{true, false}) { + final SegmentReplicationStatus.SegmentStatsSnapshot snapshot = + status.computeSegmentStats(segments, ignoreMissingServers); + + final Object2IntMap expectedUnavailable = computeExpectedUnavailable(status, segments); + final Object2IntMap expectedDeepStorageOnly = computeExpectedDeepStorageOnly(status, segments); + final Map> expectedUnderReplicated = + status.getTierToDatasourceToUnderReplicated(segments, ignoreMissingServers); + + Assert.assertEquals(expectedUnavailable, snapshot.getDatasourceToUnavailableCount()); + Assert.assertEquals(expectedDeepStorageOnly, snapshot.getDatasourceToDeepStorageOnlyCount()); + Assert.assertEquals(expectedUnderReplicated, snapshot.getTierToDatasourceToUnderReplicatedCount()); + } + } + + @Test + public void testIgnoreMissingServersUsesMissingNotMissingAndLoadable() + { + final Map> replicaCountsInTier = new HashMap<>(); + + // required=3, requiredAndLoadable=1 (only 1 loadable server), loaded=0. + // missing() = 3, missingAndLoadable() = 1. + final SegmentReplicaCount count = new SegmentReplicaCount(); + count.setRequired(3, 1); + replicaCountsInTier.put(segments.get(0).getId(), Map.of(TIER_1, count)); + + final SegmentReplicationStatus status = new SegmentReplicationStatus(replicaCountsInTier); + final List singleSegment = List.of(segments.get(0)); + + final SegmentReplicationStatus.SegmentStatsSnapshot ignoreMissing = + status.computeSegmentStats(singleSegment, true); + Assert.assertEquals( + 3L, + ignoreMissing.getTierToDatasourceToUnderReplicatedCount().get(TIER_1).getLong("wiki") + ); + + final SegmentReplicationStatus.SegmentStatsSnapshot respectMissing = + status.computeSegmentStats(singleSegment, false); + Assert.assertEquals( + 1L, + respectMissing.getTierToDatasourceToUnderReplicatedCount().get(TIER_1).getLong("wiki") + ); + } + + private static SegmentReplicaCount countOf(int required, int requiredAndLoadable, int loaded) + { + final SegmentReplicaCount count = new SegmentReplicaCount(); + count.setRequired(required, requiredAndLoadable); + for (int i = 0; i < loaded; i++) { + count.incrementLoaded(); + } + return count; + } + + private static Object2IntMap computeExpectedUnavailable( + SegmentReplicationStatus status, + List segments + ) + { + final it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap result = + new it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap<>(); + for (DataSegment segment : segments) { + final SegmentReplicaCount rc = status.getReplicaCountsInCluster(segment.getId()); + if (rc != null && (rc.totalLoaded() > 0 || rc.required() == 0)) { + result.addTo(segment.getDataSource(), 0); + } else { + result.addTo(segment.getDataSource(), 1); + } + } + return result; + } + + private static Object2IntMap computeExpectedDeepStorageOnly( + SegmentReplicationStatus status, + List segments + ) + { + final it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap result = + new it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap<>(); + for (DataSegment segment : segments) { + final SegmentReplicaCount rc = status.getReplicaCountsInCluster(segment.getId()); + if (rc != null && rc.totalLoaded() == 0 && rc.required() == 0) { + result.addTo(segment.getDataSource(), 1); + } + } + return result; + } +}