getAppliedRules()
+ {
+ return appliedRules;
+ }
+
+ /** True if a compaction job should be emitted for this interval (rules resolved and not skipped). */
+ boolean isJobEligible()
+ {
+ return config != null
+ && (disposition == IntervalDisposition.INCLUDED || disposition == IntervalDisposition.TRUNCATED);
+ }
+ }
+
+ /**
+ * The configured skip offset resolved against the live segment timeline. Present only when
+ * a skip offset is configured on the template. Always fully resolved — there is no
+ * "configured but not applied" state in the plan, because the planner requires a live timeline.
+ */
+ static final class SkipOffsetResolution
+ {
+ enum Type
+ {
+ FROM_NOW,
+ FROM_LATEST
+ }
+
+ private final Type type;
+ private final Period period;
+ private final DateTime effectiveEndTime;
+
+ SkipOffsetResolution(Type type, Period period, DateTime effectiveEndTime)
+ {
+ this.type = type;
+ this.period = period;
+ this.effectiveEndTime = effectiveEndTime;
+ }
+
+ Type getType()
+ {
+ return type;
+ }
+
+ Period getPeriod()
+ {
+ return period;
+ }
+
+ DateTime getEffectiveEndTime()
+ {
+ return effectiveEndTime;
+ }
+ }
+
+ /**
+ * Captured when planning fails before any intervals can be produced (e.g., misconfigured
+ * granularity ordering, no rules). The plan still carries the error so callers can surface
+ * it as part of the response rather than as an exception.
+ */
+ static final class PlanValidationError
+ {
+ enum Type
+ {
+ INVALID_GRANULARITY_TIMELINE,
+ VALIDATION_ERROR
+ }
+
+ private final Type type;
+ private final String message;
+ @Nullable
+ private final Interval olderInterval;
+ @Nullable
+ private final Granularity olderGranularity;
+ @Nullable
+ private final Interval newerInterval;
+ @Nullable
+ private final Granularity newerGranularity;
+
+ PlanValidationError(Type type, String message)
+ {
+ this(type, message, null, null, null, null);
+ }
+
+ PlanValidationError(
+ Type type,
+ String message,
+ @Nullable Interval olderInterval,
+ @Nullable Granularity olderGranularity,
+ @Nullable Interval newerInterval,
+ @Nullable Granularity newerGranularity
+ )
+ {
+ this.type = type;
+ this.message = message;
+ this.olderInterval = olderInterval;
+ this.olderGranularity = olderGranularity;
+ this.newerInterval = newerInterval;
+ this.newerGranularity = newerGranularity;
+ }
+
+ Type getType()
+ {
+ return type;
+ }
+
+ String getMessage()
+ {
+ return message;
+ }
+
+ @Nullable
+ Interval getOlderInterval()
+ {
+ return olderInterval;
+ }
+
+ @Nullable
+ Granularity getOlderGranularity()
+ {
+ return olderGranularity;
+ }
+
+ @Nullable
+ Interval getNewerInterval()
+ {
+ return newerInterval;
+ }
+
+ @Nullable
+ Granularity getNewerGranularity()
+ {
+ return newerGranularity;
+ }
+ }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/ReindexingPlanner.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/ReindexingPlanner.java
new file mode 100644
index 000000000000..aae5bd85e46c
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/ReindexingPlanner.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.compaction.IntervalPartitioningInfo;
+import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
+import org.apache.druid.timeline.SegmentTimeline;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Builds a {@link ReindexingPlan} from a {@link CascadingReindexingTemplate} for a given reference
+ * time and the live segment timeline. The plan is the single source of truth for both
+ * compaction-job creation and the API timeline view, ensuring the two cannot drift.
+ *
+ * The data range bounds the planned intervals (no-overlap intervals become
+ * {@link ReindexingPlan.IntervalDisposition#SKIPPED_NO_DATA}), {@code skipOffsetFromLatest} is
+ * anchored on the live data, and intervals that extend past the skip boundary are truncated or
+ * marked {@link ReindexingPlan.IntervalDisposition#SKIPPED_BEYOND_BOUNDARY}.
+ *
+ *
Validation failures (misconfigured granularity ordering, no rules) are caught and surfaced
+ * on the returned plan as a {@link ReindexingPlan.PlanValidationError} so HTTP callers can render
+ * them in the response rather than as 500s.
+ */
+final class ReindexingPlanner
+{
+ private static final Logger LOG = new Logger(ReindexingPlanner.class);
+
+ private final CascadingReindexingTemplate template;
+
+ ReindexingPlanner(CascadingReindexingTemplate template)
+ {
+ this.template = template;
+ }
+
+ /**
+ * Build a plan for the given reference time. The plan reflects exactly what
+ * {@link CascadingReindexingTemplate#createCompactionJobs} would do given the same
+ * {@code timeline} and reference time.
+ *
+ * @param timeline live segment timeline for the datasource; must be non-null and non-empty —
+ * callers are responsible for short-circuiting when no data exists rather than
+ * calling this with a fabricated empty timeline.
+ */
+ ReindexingPlan plan(DateTime referenceTime, SegmentTimeline timeline)
+ {
+ final String dataSource = template.getDataSource();
+
+ if (!template.getReindexingRuleProvider().isReady()) {
+ LOG.info(
+ "Rule provider [%s] is not ready for dataSource[%s], returning empty plan",
+ template.getReindexingRuleProvider().getType(),
+ dataSource
+ );
+ return new ReindexingPlan(dataSource, referenceTime, null, Collections.emptyList(), null);
+ }
+
+ final List searchIntervals;
+ try {
+ searchIntervals = template.generateAlignedSearchIntervals(referenceTime);
+ }
+ catch (SegmentGranularityTimelineValidationException e) {
+ LOG.warn(e, "Granularity timeline validation failed for dataSource[%s]", dataSource);
+ return new ReindexingPlan(
+ dataSource,
+ referenceTime,
+ null,
+ Collections.emptyList(),
+ new ReindexingPlan.PlanValidationError(
+ ReindexingPlan.PlanValidationError.Type.INVALID_GRANULARITY_TIMELINE,
+ e.getMessage(),
+ e.getOlderInterval(),
+ e.getOlderGranularity(),
+ e.getNewerInterval(),
+ e.getNewerGranularity()
+ )
+ );
+ }
+ catch (IAE e) {
+ LOG.warn(e, "Validation failed while planning timeline for dataSource[%s]", dataSource);
+ return new ReindexingPlan(
+ dataSource,
+ referenceTime,
+ null,
+ Collections.emptyList(),
+ new ReindexingPlan.PlanValidationError(
+ ReindexingPlan.PlanValidationError.Type.VALIDATION_ERROR,
+ e.getMessage()
+ )
+ );
+ }
+
+ if (searchIntervals.isEmpty()) {
+ LOG.debug("No search intervals generated for dataSource[%s]", dataSource);
+ return new ReindexingPlan(dataSource, referenceTime, null, Collections.emptyList(), null);
+ }
+
+ final ReindexingPlan.SkipOffsetResolution skipOffsetResolution = buildSkipOffsetResolution(timeline, referenceTime);
+ final Interval dataRangeWithSkipOffset = computeDataRangeWithSkipOffset(timeline, referenceTime);
+ if (dataRangeWithSkipOffset == null) {
+ LOG.debug("All data for dataSource[%s] is within skip offsets; no intervals will be planned", dataSource);
+ return new ReindexingPlan(dataSource, referenceTime, skipOffsetResolution, Collections.emptyList(), null);
+ }
+
+ final List planned = new ArrayList<>(searchIntervals.size());
+ for (int i = 0; i < searchIntervals.size(); i++) {
+ final IntervalPartitioningInfo originalInfo = searchIntervals.get(i);
+ final Interval originalInterval = originalInfo.getInterval();
+
+ if (!originalInterval.overlaps(dataRangeWithSkipOffset)) {
+ planned.add(noRuleEntry(originalInterval, originalInterval, ReindexingPlan.IntervalDisposition.SKIPPED_NO_DATA, originalInfo));
+ continue;
+ }
+
+ // When a skip offset is configured, intervals extending past the adjusted data-range end
+ // must be truncated (or skipped entirely if the truncation eats the whole interval).
+ final DateTime truncationBoundary =
+ (template.getSkipOffsetFromNowOrNull() != null || template.getSkipOffsetFromLatestOrNull() != null)
+ ? dataRangeWithSkipOffset.getEnd()
+ : null;
+
+ Interval effectiveInterval = originalInterval;
+ ReindexingPlan.IntervalDisposition disposition = ReindexingPlan.IntervalDisposition.INCLUDED;
+
+ if (truncationBoundary != null && originalInterval.getEnd().isAfter(truncationBoundary)) {
+ final DateTime alignedEnd = originalInfo.getGranularity().bucketStart(truncationBoundary);
+ if (!alignedEnd.isAfter(originalInterval.getStart())) {
+ planned.add(noRuleEntry(originalInterval, originalInterval, ReindexingPlan.IntervalDisposition.SKIPPED_BEYOND_BOUNDARY, originalInfo));
+ continue;
+ }
+ effectiveInterval = new Interval(originalInterval.getStart(), alignedEnd);
+ disposition = ReindexingPlan.IntervalDisposition.TRUNCATED;
+ // Replace the entry so the synthetic-timeline lookup in ReindexingConfigBuilder matches the truncated interval.
+ searchIntervals.set(
+ i,
+ new IntervalPartitioningInfo(effectiveInterval, originalInfo.getSourceRule(), originalInfo.isRuleSynthetic())
+ );
+ }
+
+ final InlineSchemaDataSourceCompactionConfig.Builder builder = template.createBaseConfigBuilder();
+ final ReindexingConfigBuilder configBuilder = new ReindexingConfigBuilder(
+ template.getReindexingRuleProvider(),
+ effectiveInterval,
+ referenceTime,
+ searchIntervals,
+ template.getTuningConfig()
+ );
+ final ReindexingConfigBuilder.BuildResult buildResult = configBuilder.applyToWithDetails(builder);
+
+ if (buildResult.getRuleCount() > 0) {
+ planned.add(new ReindexingPlan.PlannedInterval(
+ effectiveInterval,
+ originalInterval,
+ disposition,
+ originalInfo.getSourceRule(),
+ originalInfo.isRuleSynthetic(),
+ originalInfo.getGranularity(),
+ buildResult.getRuleCount(),
+ builder.build(),
+ buildResult.getAppliedRules()
+ ));
+ } else {
+ // Interval is geometrically valid but no rules apply to it. We retain INCLUDED/TRUNCATED disposition
+ // (the geometry didn't skip it) but mark config null so it won't generate a job and the view can
+ // choose to hide it.
+ planned.add(new ReindexingPlan.PlannedInterval(
+ effectiveInterval,
+ originalInterval,
+ disposition,
+ originalInfo.getSourceRule(),
+ originalInfo.isRuleSynthetic(),
+ originalInfo.getGranularity(),
+ 0,
+ null,
+ Collections.emptyList()
+ ));
+ }
+ }
+
+ return new ReindexingPlan(dataSource, referenceTime, skipOffsetResolution, planned, null);
+ }
+
+ /**
+ * Computes the data-range interval clipped by skip offsets. Returns null when the entire data
+ * range falls within skip offsets and nothing remains to plan.
+ */
+ @Nullable
+ private Interval computeDataRangeWithSkipOffset(SegmentTimeline timeline, DateTime referenceTime)
+ {
+ final Interval dataBounds = new Interval(
+ timeline.first().getInterval().getStart(),
+ timeline.last().getInterval().getEnd()
+ );
+
+ final Period skipFromNow = template.getSkipOffsetFromNowOrNull();
+ final Period skipFromLatest = template.getSkipOffsetFromLatestOrNull();
+
+ DateTime end = dataBounds.getEnd();
+ if (skipFromNow != null) {
+ end = referenceTime.minus(skipFromNow);
+ } else if (skipFromLatest != null) {
+ end = end.minus(skipFromLatest);
+ }
+ if (end.isBefore(dataBounds.getStart())) {
+ return null;
+ }
+ return new Interval(dataBounds.getStart(), end);
+ }
+
+ /**
+ * Builds the skip-offset resolution stamp for the plan. Returns null when no skip offset is configured.
+ */
+ @Nullable
+ private ReindexingPlan.SkipOffsetResolution buildSkipOffsetResolution(SegmentTimeline timeline, DateTime referenceTime)
+ {
+ final Period skipFromNow = template.getSkipOffsetFromNowOrNull();
+ if (skipFromNow != null) {
+ return new ReindexingPlan.SkipOffsetResolution(
+ ReindexingPlan.SkipOffsetResolution.Type.FROM_NOW,
+ skipFromNow,
+ referenceTime.minus(skipFromNow)
+ );
+ }
+ final Period skipFromLatest = template.getSkipOffsetFromLatestOrNull();
+ if (skipFromLatest != null) {
+ return new ReindexingPlan.SkipOffsetResolution(
+ ReindexingPlan.SkipOffsetResolution.Type.FROM_LATEST,
+ skipFromLatest,
+ timeline.last().getInterval().getEnd().minus(skipFromLatest)
+ );
+ }
+ return null;
+ }
+
+ private static ReindexingPlan.PlannedInterval noRuleEntry(
+ Interval interval,
+ Interval originalInterval,
+ ReindexingPlan.IntervalDisposition disposition,
+ IntervalPartitioningInfo info
+ )
+ {
+ return new ReindexingPlan.PlannedInterval(
+ interval,
+ originalInterval,
+ disposition,
+ info.getSourceRule(),
+ info.isRuleSynthetic(),
+ info.getGranularity(),
+ 0,
+ null,
+ Collections.emptyList()
+ );
+ }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/ReindexingTimelineView.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/ReindexingTimelineView.java
new file mode 100644
index 000000000000..936a9ae56d90
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/ReindexingTimelineView.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.server.compaction.ReindexingDataSchemaRule;
+import org.apache.druid.server.compaction.ReindexingDeletionRule;
+import org.apache.druid.server.compaction.ReindexingIndexSpecRule;
+import org.apache.druid.server.compaction.ReindexingPartitioningRule;
+import org.apache.druid.server.compaction.ReindexingRule;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Public API DTO describing the timeline of search intervals and their associated reindexing
+ * configurations for a cascading reindexing supervisor. Surfaced through the
+ * {@code /supervisor/{id}/reindexingTimeline} endpoint so operators (and the web console)
+ * can visualize how rules are applied across time.
+ *
+ * Always produced via {@link #fromPlan(ReindexingPlan)} so the API view is a pure projection
+ * of the same plan that drives job creation — the two cannot drift.
+ */
+public class ReindexingTimelineView
+{
+ private final String dataSource;
+ private final DateTime referenceTime;
+ @Nullable
+ private final SkipOffsetInfo skipOffset;
+ private final List intervals;
+ @Nullable
+ private final ValidationError validationError;
+
+ @JsonCreator
+ public ReindexingTimelineView(
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("referenceTime") DateTime referenceTime,
+ @JsonProperty("skipOffset") @Nullable SkipOffsetInfo skipOffset,
+ @JsonProperty("intervals") List intervals,
+ @JsonProperty("validationError") @Nullable ValidationError validationError
+ )
+ {
+ this.dataSource = dataSource;
+ this.referenceTime = referenceTime;
+ this.skipOffset = skipOffset;
+ this.intervals = Collections.unmodifiableList(intervals);
+ this.validationError = validationError;
+ }
+
+ /**
+ * Projects a {@link ReindexingPlan} into the API view. Intervals that have rules applied
+ * are emitted with their resolved config; intervals fully eclipsed by a skip-offset boundary
+ * are emitted with zero rules so the UI can render the skipped span. Intervals with no data
+ * overlap (a job-path concern) are not surfaced — operators see only ranges that would
+ * actually be compacted or that are explicitly skipped.
+ */
+ static ReindexingTimelineView fromPlan(ReindexingPlan plan)
+ {
+ final List intervalConfigs = new ArrayList<>();
+ for (ReindexingPlan.PlannedInterval planned : plan.getIntervals()) {
+ switch (planned.getDisposition()) {
+ case INCLUDED:
+ case TRUNCATED:
+ if (planned.getRuleCount() > 0) {
+ intervalConfigs.add(new IntervalConfig(
+ planned.getInterval(),
+ planned.getRuleCount(),
+ planned.getConfig(),
+ planned.getAppliedRules()
+ ));
+ }
+ break;
+ case SKIPPED_BEYOND_BOUNDARY:
+ intervalConfigs.add(new IntervalConfig(
+ planned.getInterval(),
+ 0,
+ null,
+ Collections.emptyList()
+ ));
+ break;
+ case SKIPPED_NO_DATA:
+ // Operator-visible timeline only shows ranges that would be compacted or are
+ // explicitly skipped by configuration. Ranges with no underlying data are omitted.
+ break;
+ }
+ }
+
+ return new ReindexingTimelineView(
+ plan.getDataSource(),
+ plan.getReferenceTime(),
+ SkipOffsetInfo.fromPlan(plan.getSkipOffset()),
+ intervalConfigs,
+ ValidationError.fromPlan(plan.getValidationError())
+ );
+ }
+
+ @JsonProperty
+ public String getDataSource()
+ {
+ return dataSource;
+ }
+
+ @JsonProperty
+ public DateTime getReferenceTime()
+ {
+ return referenceTime;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public SkipOffsetInfo getSkipOffset()
+ {
+ return skipOffset;
+ }
+
+ @JsonProperty
+ public List getIntervals()
+ {
+ return intervals;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public ValidationError getValidationError()
+ {
+ return validationError;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ReindexingTimelineView that = (ReindexingTimelineView) o;
+ return Objects.equals(dataSource, that.dataSource)
+ && Objects.equals(referenceTime, that.referenceTime)
+ && Objects.equals(skipOffset, that.skipOffset)
+ && Objects.equals(intervals, that.intervals)
+ && Objects.equals(validationError, that.validationError);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(dataSource, referenceTime, skipOffset, intervals, validationError);
+ }
+
+ /**
+ * Describes a granularity-ordering or configuration error encountered while building the timeline.
+ * Surfaced in the response body rather than as a 5xx so clients can render meaningful messages.
+ */
+ public static class ValidationError
+ {
+ private final String errorType;
+ private final String message;
+ @Nullable
+ private final String olderInterval;
+ @Nullable
+ private final String olderGranularity;
+ @Nullable
+ private final String newerInterval;
+ @Nullable
+ private final String newerGranularity;
+
+ @JsonCreator
+ public ValidationError(
+ @JsonProperty("errorType") String errorType,
+ @JsonProperty("message") String message,
+ @JsonProperty("olderInterval") @Nullable String olderInterval,
+ @JsonProperty("olderGranularity") @Nullable String olderGranularity,
+ @JsonProperty("newerInterval") @Nullable String newerInterval,
+ @JsonProperty("newerGranularity") @Nullable String newerGranularity
+ )
+ {
+ this.errorType = errorType;
+ this.message = message;
+ this.olderInterval = olderInterval;
+ this.olderGranularity = olderGranularity;
+ this.newerInterval = newerInterval;
+ this.newerGranularity = newerGranularity;
+ }
+
+ @Nullable
+ static ValidationError fromPlan(@Nullable ReindexingPlan.PlanValidationError error)
+ {
+ if (error == null) {
+ return null;
+ }
+ return new ValidationError(
+ error.getType().name(),
+ error.getMessage(),
+ error.getOlderInterval() == null ? null : error.getOlderInterval().toString(),
+ error.getOlderGranularity() == null ? null : error.getOlderGranularity().toString(),
+ error.getNewerInterval() == null ? null : error.getNewerInterval().toString(),
+ error.getNewerGranularity() == null ? null : error.getNewerGranularity().toString()
+ );
+ }
+
+ @JsonProperty
+ public String getErrorType()
+ {
+ return errorType;
+ }
+
+ @JsonProperty
+ public String getMessage()
+ {
+ return message;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public String getOlderInterval()
+ {
+ return olderInterval;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public String getOlderGranularity()
+ {
+ return olderGranularity;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public String getNewerInterval()
+ {
+ return newerInterval;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public String getNewerGranularity()
+ {
+ return newerGranularity;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ValidationError that = (ValidationError) o;
+ return Objects.equals(errorType, that.errorType)
+ && Objects.equals(message, that.message)
+ && Objects.equals(olderInterval, that.olderInterval)
+ && Objects.equals(olderGranularity, that.olderGranularity)
+ && Objects.equals(newerInterval, that.newerInterval)
+ && Objects.equals(newerGranularity, that.newerGranularity);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(errorType, message, olderInterval, olderGranularity, newerInterval, newerGranularity);
+ }
+ }
+
+ /**
+ * Describes the configured skip offset that was applied to this view. Only present in the
+ * response when the configured skip offset was actually resolved against the live segment
+ * timeline — operators don't see a "configured but not applied" state because the scheduler
+ * short-circuits to an empty timeline when no segments exist.
+ */
+ public static class SkipOffsetInfo
+ {
+ private final String type;
+ private final Period period;
+ private final DateTime effectiveEndTime;
+
+ @JsonCreator
+ public SkipOffsetInfo(
+ @JsonProperty("type") String type,
+ @JsonProperty("period") Period period,
+ @JsonProperty("effectiveEndTime") DateTime effectiveEndTime
+ )
+ {
+ this.type = type;
+ this.period = period;
+ this.effectiveEndTime = effectiveEndTime;
+ }
+
+ @Nullable
+ static SkipOffsetInfo fromPlan(@Nullable ReindexingPlan.SkipOffsetResolution resolution)
+ {
+ if (resolution == null) {
+ return null;
+ }
+ final String typeName = resolution.getType() == ReindexingPlan.SkipOffsetResolution.Type.FROM_NOW
+ ? "skipOffsetFromNow"
+ : "skipOffsetFromLatest";
+ return new SkipOffsetInfo(typeName, resolution.getPeriod(), resolution.getEffectiveEndTime());
+ }
+
+ @JsonProperty
+ public String getType()
+ {
+ return type;
+ }
+
+ @JsonProperty
+ public Period getPeriod()
+ {
+ return period;
+ }
+
+ @JsonProperty
+ public DateTime getEffectiveEndTime()
+ {
+ return effectiveEndTime;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SkipOffsetInfo that = (SkipOffsetInfo) o;
+ return Objects.equals(type, that.type)
+ && Objects.equals(period, that.period)
+ && Objects.equals(effectiveEndTime, that.effectiveEndTime);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(type, period, effectiveEndTime);
+ }
+ }
+
+ /**
+ * A single search interval and its resolved reindexing configuration. {@code ruleCount==0}
+ * with a null {@code config} indicates a span that was eclipsed by the skip-offset boundary
+ * (no compaction would occur in that span).
+ */
+ public static class IntervalConfig
+ {
+ private final Interval interval;
+ private final int ruleCount;
+ @Nullable
+ private final DataSourceCompactionConfig config;
+ private final List appliedRules;
+
+ @JsonCreator
+ public IntervalConfig(
+ @JsonProperty("interval") Interval interval,
+ @JsonProperty("ruleCount") int ruleCount,
+ @JsonProperty("config") @Nullable DataSourceCompactionConfig config,
+ @JsonProperty("appliedRules") List appliedRules
+ )
+ {
+ this.interval = interval;
+ this.ruleCount = ruleCount;
+ this.config = config;
+ this.appliedRules = Collections.unmodifiableList(appliedRules);
+ }
+
+ @JsonProperty
+ public Interval getInterval()
+ {
+ return interval;
+ }
+
+ @JsonProperty
+ public int getRuleCount()
+ {
+ return ruleCount;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public DataSourceCompactionConfig getConfig()
+ {
+ return config;
+ }
+
+ @JsonProperty
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+ @JsonSubTypes({
+ @JsonSubTypes.Type(value = ReindexingDataSchemaRule.class, name = "dataSchema"),
+ @JsonSubTypes.Type(value = ReindexingDeletionRule.class, name = "deletion"),
+ @JsonSubTypes.Type(value = ReindexingPartitioningRule.class, name = "partitioning"),
+ @JsonSubTypes.Type(value = ReindexingIndexSpecRule.class, name = "indexSpec"),
+ })
+ public List getAppliedRules()
+ {
+ return appliedRules;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ IntervalConfig that = (IntervalConfig) o;
+ return ruleCount == that.ruleCount
+ && Objects.equals(interval, that.interval)
+ && Objects.equals(config, that.config)
+ && Objects.equals(appliedRules, that.appliedRules);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(interval, ruleCount, config, appliedRules);
+ }
+ }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index aff9edf19af9..c45d631b6c4d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -36,9 +36,12 @@
import org.apache.druid.audit.AuditEntry;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.error.DruidException;
+import org.apache.druid.indexing.compact.CompactionScheduler;
+import org.apache.druid.indexing.compact.ReindexingTimelineView;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.http.security.SupervisorResourceFilter;
+import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.segment.incremental.ParseExceptionReport;
@@ -52,6 +55,7 @@
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.utils.CollectionUtils;
+import org.joda.time.DateTime;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -100,6 +104,7 @@ public class SupervisorResource
private final ObjectMapper objectMapper;
private final AuditManager auditManager;
private final AuthConfig authConfig;
+ private final CompactionScheduler compactionScheduler;
@Inject
public SupervisorResource(
@@ -107,7 +112,8 @@ public SupervisorResource(
AuthorizerMapper authorizerMapper,
ObjectMapper objectMapper,
AuthConfig authConfig,
- AuditManager auditManager
+ AuditManager auditManager,
+ CompactionScheduler compactionScheduler
)
{
this.taskMaster = taskMaster;
@@ -115,6 +121,7 @@ public SupervisorResource(
this.objectMapper = objectMapper;
this.authConfig = authConfig;
this.auditManager = auditManager;
+ this.compactionScheduler = compactionScheduler;
}
@POST
@@ -362,6 +369,68 @@ public Response getAllTaskStats(
);
}
+ @GET
+ @Path("/{id}/reindexingTimeline")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(SupervisorResourceFilter.class)
+ public Response getReindexingTimeline(
+ @PathParam("id") final String id,
+ @QueryParam("referenceTime") @Nullable final String referenceTimeStr
+ )
+ {
+ final DateTime referenceTime;
+ try {
+ referenceTime = referenceTimeStr == null ? DateTimes.nowUtc() : DateTimes.of(referenceTimeStr);
+ }
+ catch (IllegalArgumentException e) {
+ return Response.status(Response.Status.BAD_REQUEST)
+ .entity(ImmutableMap.of(
+ "error",
+ StringUtils.format(
+ "Reference time[%s] is in invalid format. Use ISO 8601 standard format.",
+ referenceTimeStr
+ )
+ ))
+ .build();
+ }
+
+ return asLeaderWithSupervisorManager(
+ manager -> {
+ final Optional specOptional = manager.getSupervisorSpec(id);
+ if (!specOptional.isPresent()) {
+ return Response.status(Response.Status.NOT_FOUND)
+ .entity(ImmutableMap.of("error", StringUtils.format("Supervisor[%s] does not exist", id)))
+ .build();
+ }
+ try {
+ final ReindexingTimelineView view = compactionScheduler.previewReindexingTimeline(
+ specOptional.get(),
+ referenceTime
+ );
+ return Response.ok(view).build();
+ }
+ catch (DruidException e) {
+ return Response.status(httpStatusFor(e))
+ .entity(ImmutableMap.of("error", e.getMessage()))
+ .build();
+ }
+ }
+ );
+ }
+
+ private static Response.Status httpStatusFor(DruidException e)
+ {
+ switch (e.getCategory()) {
+ case NOT_FOUND:
+ return Response.Status.NOT_FOUND;
+ case INVALID_INPUT:
+ case UNSUPPORTED:
+ return Response.Status.BAD_REQUEST;
+ default:
+ return Response.Status.INTERNAL_SERVER_ERROR;
+ }
+ }
+
@GET
@Path("/{id}/parseErrors")
@Produces(MediaType.APPLICATION_JSON)
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java
index c3325725f16f..ba17494887d4 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java
@@ -1961,7 +1961,7 @@ private ReindexingRuleProvider createMockProvider(List periods)
}
ReindexingRuleProvider mockProvider = EasyMock.createMock(ReindexingRuleProvider.class);
- EasyMock.expect(mockProvider.isReady()).andReturn(true);
+ EasyMock.expect(mockProvider.isReady()).andReturn(true).anyTimes();
EasyMock.expect(mockProvider.getPartitioningRules()).andReturn(partitioningRules).anyTimes();
// Return a fresh stream on each call to avoid "stream has already been operated upon or closed" errors
EasyMock.expect(mockProvider.streamAllRules()).andAnswer(() -> partitioningRules.stream().map(r -> (ReindexingRule) r)).anyTimes();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/compact/ReindexingPlannerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/compact/ReindexingPlannerTest.java
new file mode 100644
index 000000000000..d5464c8d9312
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/compact/ReindexingPlannerTest.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.filter.EqualityFilter;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.server.compaction.InlineReindexingRuleProvider;
+import org.apache.druid.server.compaction.ReindexingDataSchemaRule;
+import org.apache.druid.server.compaction.ReindexingDeletionRule;
+import org.apache.druid.server.compaction.ReindexingPartitioningRule;
+import org.apache.druid.server.compaction.ReindexingRule;
+import org.apache.druid.server.compaction.ReindexingRuleProvider;
+import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentTimeline;
+import org.easymock.EasyMock;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Unit tests for {@link ReindexingPlanner}. The planner requires a non-null, non-empty
+ * {@link SegmentTimeline} — callers short-circuit on missing data before invoking it — so every
+ * test passes a constructed timeline that covers the rule range under test.
+ */
+public class ReindexingPlannerTest extends InitializedNullHandlingTest
+{
+ private static final DateTime REFERENCE_TIME = DateTimes.of("2025-02-01T00:00:00Z");
+ // A timeline that covers everything the rules in these tests will look at.
+ private static final SegmentTimeline WIDE_TIMELINE = createTimeline(
+ DateTimes.of("2024-01-01T00:00:00Z"),
+ DateTimes.of("2025-01-31T23:00:00Z")
+ );
+
+ @Test
+ public void test_plan_comprehensive()
+ {
+ final ReindexingPartitioningRule partitioning7d = new ReindexingPartitioningRule(
+ "seg-gran-7d",
+ null,
+ Period.days(7),
+ Granularities.HOUR,
+ new DynamicPartitionsSpec(5000000, null),
+ null
+ );
+ final ReindexingPartitioningRule partitioning30d = new ReindexingPartitioningRule(
+ "seg-gran-30d",
+ null,
+ Period.days(30),
+ Granularities.DAY,
+ new DynamicPartitionsSpec(5000000, null),
+ null
+ );
+ final ReindexingDataSchemaRule dataSchema15d = new ReindexingDataSchemaRule(
+ "data-schema-15d",
+ null,
+ Period.days(15),
+ new UserCompactionTaskDimensionsConfig(null),
+ new AggregatorFactory[]{new CountAggregatorFactory("count")},
+ Granularities.MINUTE,
+ true,
+ null
+ );
+ final ReindexingDeletionRule deletion10d = new ReindexingDeletionRule(
+ "deletion-10d",
+ null,
+ Period.days(10),
+ new EqualityFilter("country", ColumnType.STRING, "US", null),
+ null
+ );
+ final ReindexingDeletionRule deletion20d = new ReindexingDeletionRule(
+ "deletion-20d",
+ null,
+ Period.days(20),
+ new EqualityFilter("device", ColumnType.STRING, "mobile", null),
+ null
+ );
+
+ final ReindexingRuleProvider provider = InlineReindexingRuleProvider
+ .builder()
+ .partitioningRules(List.of(partitioning7d, partitioning30d))
+ .dataSchemaRules(List.of(dataSchema15d))
+ .deletionRules(List.of(deletion10d, deletion20d))
+ .build();
+
+ final CascadingReindexingTemplate template = newTemplate(provider, null);
+ final ReindexingPlan plan = new ReindexingPlanner(template).plan(REFERENCE_TIME, WIDE_TIMELINE);
+
+ Assertions.assertEquals("testDS", plan.getDataSource());
+ Assertions.assertEquals(REFERENCE_TIME, plan.getReferenceTime());
+ Assertions.assertNull(plan.getValidationError());
+ Assertions.assertNull(plan.getSkipOffset());
+ Assertions.assertTrue(plan.getIntervals().size() >= 2, "Expected at least 2 intervals");
+
+ boolean foundPartitioning = false;
+ boolean foundDataSchema = false;
+ boolean foundDeletion = false;
+
+ for (ReindexingPlan.PlannedInterval planned : plan.getIntervals()) {
+ Assertions.assertNotNull(planned.getInterval());
+ if (planned.getRuleCount() == 0) {
+ continue; // SKIPPED_NO_DATA or no rules apply — outside the comprehensive coverage check.
+ }
+ Assertions.assertNotNull(planned.getConfig());
+ Assertions.assertEquals(planned.getRuleCount(), planned.getAppliedRules().size());
+ Assertions.assertEquals(
+ ReindexingPlan.IntervalDisposition.INCLUDED,
+ planned.getDisposition()
+ );
+
+ for (ReindexingRule rule : planned.getAppliedRules()) {
+ if (rule instanceof ReindexingPartitioningRule) {
+ foundPartitioning = true;
+ } else if (rule instanceof ReindexingDataSchemaRule) {
+ foundDataSchema = true;
+ } else if (rule instanceof ReindexingDeletionRule) {
+ foundDeletion = true;
+ }
+ }
+ }
+
+ Assertions.assertTrue(foundPartitioning, "Plan should contain a segmentGranularity rule");
+ Assertions.assertTrue(foundDataSchema, "Plan should contain a dataSchema rule");
+ Assertions.assertTrue(foundDeletion, "Plan should contain a deletion rule");
+ }
+
+ @Test
+ public void test_plan_propagatesTemplateTuningConfig()
+ {
+ final ReindexingRuleProvider provider = InlineReindexingRuleProvider
+ .builder()
+ .partitioningRules(List.of(
+ new ReindexingPartitioningRule(
+ "seg-7d",
+ null,
+ Period.days(7),
+ Granularities.HOUR,
+ new DynamicPartitionsSpec(5000000, null),
+ null
+ )
+ ))
+ .build();
+
+ final UserCompactionTaskQueryTuningConfig templateTuningConfig = UserCompactionTaskQueryTuningConfig.builder()
+ .maxRowsInMemory(12345)
+ .maxNumConcurrentSubTasks(7)
+ .maxRetry(9)
+ .build();
+
+ final CascadingReindexingTemplate template = newTemplate(provider, templateTuningConfig);
+ final ReindexingPlan plan = new ReindexingPlanner(template).plan(REFERENCE_TIME, WIDE_TIMELINE);
+
+ boolean sawConfig = false;
+ for (ReindexingPlan.PlannedInterval planned : plan.getIntervals()) {
+ if (planned.getConfig() == null) {
+ continue;
+ }
+ sawConfig = true;
+ final UserCompactionTaskQueryTuningConfig effective = planned.getConfig().getTuningConfig();
+ Assertions.assertNotNull(effective);
+ Assertions.assertEquals(Integer.valueOf(12345), effective.getMaxRowsInMemory());
+ Assertions.assertEquals(Integer.valueOf(7), effective.getMaxNumConcurrentSubTasks());
+ Assertions.assertEquals(Integer.valueOf(9), effective.getMaxRetry());
+ }
+ Assertions.assertTrue(sawConfig, "Expected at least one interval with a non-null config");
+ }
+
+ @Test
+ public void test_plan_skipOffsetFromNow_skipsBeyondBoundary()
+ {
+ final DateTime referenceTime = DateTimes.of("2025-01-29T00:00:00Z");
+ final Period skipOffset = Period.days(10);
+
+ final ReindexingRuleProvider provider = InlineReindexingRuleProvider
+ .builder()
+ .partitioningRules(List.of(
+ new ReindexingPartitioningRule("seg-3d", null, Period.days(3), Granularities.HOUR, new DynamicPartitionsSpec(5000000, null), null),
+ new ReindexingPartitioningRule("seg-30d", null, Period.days(30), Granularities.DAY, new DynamicPartitionsSpec(5000000, null), null)
+ ))
+ .build();
+
+ final CascadingReindexingTemplate template = new CascadingReindexingTemplate(
+ "testDS",
+ null,
+ null,
+ provider,
+ null,
+ null,
+ skipOffset,
+ Granularities.DAY,
+ new DynamicPartitionsSpec(5000000, null),
+ null,
+ null
+ );
+
+ final SegmentTimeline timeline = createTimeline(
+ DateTimes.of("2024-01-01T00:00:00Z"),
+ DateTimes.of("2025-01-28T23:00:00Z")
+ );
+ final ReindexingPlan plan = new ReindexingPlanner(template).plan(referenceTime, timeline);
+
+ Assertions.assertNotNull(plan.getSkipOffset());
+ Assertions.assertEquals(ReindexingPlan.SkipOffsetResolution.Type.FROM_NOW, plan.getSkipOffset().getType());
+ Assertions.assertEquals(skipOffset, plan.getSkipOffset().getPeriod());
+ Assertions.assertEquals(referenceTime.minus(skipOffset), plan.getSkipOffset().getEffectiveEndTime());
+
+ for (ReindexingPlan.PlannedInterval planned : plan.getIntervals()) {
+ if (planned.getOriginalInterval().getEnd().isAfter(plan.getSkipOffset().getEffectiveEndTime())) {
+ Assertions.assertTrue(
+ planned.getDisposition() == ReindexingPlan.IntervalDisposition.TRUNCATED
+ || planned.getDisposition() == ReindexingPlan.IntervalDisposition.SKIPPED_BEYOND_BOUNDARY,
+ "Interval beyond skip-offset boundary should be TRUNCATED or SKIPPED_BEYOND_BOUNDARY"
+ );
+ }
+ }
+ }
+
+ @Test
+ public void test_plan_skipOffsetFromLatest_anchorsOnData()
+ {
+ final DateTime referenceTime = DateTimes.of("2025-01-29T00:00:00Z");
+ final DateTime latestData = DateTimes.of("2025-01-28T12:00:00Z");
+
+ final ReindexingRuleProvider provider = InlineReindexingRuleProvider
+ .builder()
+ .partitioningRules(List.of(
+ new ReindexingPartitioningRule(
+ "seg-7d",
+ null,
+ Period.days(7),
+ Granularities.DAY,
+ new DynamicPartitionsSpec(5000000, null),
+ null
+ )
+ ))
+ .build();
+
+ final CascadingReindexingTemplate template = new CascadingReindexingTemplate(
+ "testDS",
+ null,
+ null,
+ provider,
+ null,
+ Period.hours(6),
+ null,
+ Granularities.DAY,
+ new DynamicPartitionsSpec(5000000, null),
+ null,
+ null
+ );
+
+ final SegmentTimeline timeline = createTimeline(
+ DateTimes.of("2024-12-01T00:00:00Z"),
+ latestData
+ );
+
+ final ReindexingPlan plan = new ReindexingPlanner(template).plan(referenceTime, timeline);
+
+ Assertions.assertNotNull(plan.getSkipOffset());
+ Assertions.assertEquals(ReindexingPlan.SkipOffsetResolution.Type.FROM_LATEST, plan.getSkipOffset().getType());
+ Assertions.assertEquals(latestData.minus(Period.hours(6)), plan.getSkipOffset().getEffectiveEndTime());
+ }
+
+ @Test
+ public void test_plan_validationError_invalidGranularityTimeline()
+ {
+ final DateTime referenceTime = DateTimes.of("2025-01-29T16:15:00Z");
+
+ final ReindexingRuleProvider provider = InlineReindexingRuleProvider
+ .builder()
+ .partitioningRules(List.of(
+ new ReindexingPartitioningRule("hour-rule", null, Period.days(30), Granularities.HOUR, new DynamicPartitionsSpec(5000000, null), null),
+ new ReindexingPartitioningRule("day-rule", null, Period.days(90), Granularities.DAY, new DynamicPartitionsSpec(5000000, null), null)
+ ))
+ .dataSchemaRules(List.of(
+ new ReindexingDataSchemaRule(
+ "metrics-7d",
+ null,
+ Period.days(7),
+ null,
+ new AggregatorFactory[]{new CountAggregatorFactory("count")},
+ null,
+ null,
+ null
+ )
+ ))
+ .build();
+
+ final CascadingReindexingTemplate template = new CascadingReindexingTemplate(
+ "testDS",
+ null,
+ null,
+ provider,
+ null,
+ null,
+ null,
+ Granularities.MONTH,
+ new DynamicPartitionsSpec(5000000, null),
+ null,
+ null
+ );
+
+ final ReindexingPlan plan = new ReindexingPlanner(template).plan(referenceTime, WIDE_TIMELINE);
+
+ Assertions.assertNotNull(plan.getValidationError());
+ Assertions.assertEquals(
+ ReindexingPlan.PlanValidationError.Type.INVALID_GRANULARITY_TIMELINE,
+ plan.getValidationError().getType()
+ );
+ Assertions.assertTrue(plan.getValidationError().getMessage().contains("Invalid segment granularity timeline"));
+ Assertions.assertNotNull(plan.getValidationError().getOlderInterval());
+ Assertions.assertNotNull(plan.getValidationError().getOlderGranularity());
+ Assertions.assertNotNull(plan.getValidationError().getNewerInterval());
+ Assertions.assertNotNull(plan.getValidationError().getNewerGranularity());
+ Assertions.assertTrue(plan.getIntervals().isEmpty(), "Intervals should be empty on validation error");
+ }
+
+ @Test
+ public void test_plan_ruleProviderNotReady_returnsEmptyPlan()
+ {
+ final ReindexingRuleProvider notReadyProvider = EasyMock.createMock(ReindexingRuleProvider.class);
+ EasyMock.expect(notReadyProvider.isReady()).andReturn(false).anyTimes();
+ EasyMock.expect(notReadyProvider.getType()).andReturn("mock").anyTimes();
+ EasyMock.replay(notReadyProvider);
+
+ final CascadingReindexingTemplate template = newTemplate(notReadyProvider, null);
+ final ReindexingPlan plan = new ReindexingPlanner(template).plan(REFERENCE_TIME, WIDE_TIMELINE);
+
+ Assertions.assertEquals("testDS", plan.getDataSource());
+ Assertions.assertEquals(REFERENCE_TIME, plan.getReferenceTime());
+ Assertions.assertTrue(plan.getIntervals().isEmpty());
+ Assertions.assertNull(plan.getValidationError());
+ Assertions.assertNull(plan.getSkipOffset());
+ }
+
+ @Test
+ public void test_plan_intervalsOutsideDataRangeAreSkipped()
+ {
+ final ReindexingRuleProvider provider = InlineReindexingRuleProvider
+ .builder()
+ .partitioningRules(List.of(
+ new ReindexingPartitioningRule(
+ "seg-7d",
+ null,
+ Period.days(7),
+ Granularities.DAY,
+ new DynamicPartitionsSpec(5000000, null),
+ null
+ ),
+ new ReindexingPartitioningRule(
+ "seg-90d",
+ null,
+ Period.days(90),
+ Granularities.DAY,
+ new DynamicPartitionsSpec(5000000, null),
+ null
+ )
+ ))
+ .build();
+
+ final CascadingReindexingTemplate template = newTemplate(provider, null);
+
+ // Timeline covers only a thin slice — most rule intervals will have no data overlap.
+ final SegmentTimeline timeline = createTimeline(
+ DateTimes.of("2024-12-01T00:00:00Z"),
+ DateTimes.of("2024-12-15T00:00:00Z")
+ );
+
+ final ReindexingPlan plan = new ReindexingPlanner(template).plan(REFERENCE_TIME, timeline);
+
+ boolean sawSkippedNoData = false;
+ for (ReindexingPlan.PlannedInterval planned : plan.getIntervals()) {
+ if (planned.getDisposition() == ReindexingPlan.IntervalDisposition.SKIPPED_NO_DATA) {
+ sawSkippedNoData = true;
+ Assertions.assertNull(planned.getConfig());
+ Assertions.assertEquals(0, planned.getRuleCount());
+ Assertions.assertFalse(planned.isJobEligible());
+ }
+ }
+ Assertions.assertTrue(sawSkippedNoData, "Expected at least one interval with disposition SKIPPED_NO_DATA");
+ }
+
+ private static CascadingReindexingTemplate newTemplate(
+ ReindexingRuleProvider provider,
+ UserCompactionTaskQueryTuningConfig tuningConfig
+ )
+ {
+ return new CascadingReindexingTemplate(
+ "testDS",
+ null,
+ null,
+ provider,
+ null,
+ null,
+ null,
+ Granularities.DAY,
+ new DynamicPartitionsSpec(5000000, null),
+ null,
+ tuningConfig
+ );
+ }
+
+ private static SegmentTimeline createTimeline(DateTime start, DateTime end)
+ {
+ final DataSegment segment = DataSegment.builder()
+ .dataSource("testDS")
+ .interval(new Interval(start, end))
+ .version("v1")
+ .size(1000)
+ .build();
+ return SegmentTimeline.forSegments(Collections.singletonList(segment));
+ }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/compact/ReindexingTimelineViewTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/compact/ReindexingTimelineViewTest.java
new file mode 100644
index 000000000000..4475f552454d
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/compact/ReindexingTimelineViewTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.guice.SupervisorModule;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.server.compaction.ReindexingPartitioningRule;
+import org.apache.druid.server.compaction.ReindexingRule;
+import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ReindexingTimelineViewTest
+{
+ private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
+
+ @BeforeEach
+ public void setUp()
+ {
+ OBJECT_MAPPER.registerModules(new SupervisorModule().getJacksonModules());
+ }
+
+ @Test
+ public void test_serde() throws Exception
+ {
+ final DateTime referenceTime = DateTimes.of("2025-01-29T16:15:00Z");
+
+ final ReindexingTimelineView.SkipOffsetInfo skipOffset = new ReindexingTimelineView.SkipOffsetInfo(
+ "skipOffsetFromLatest",
+ Period.days(1),
+ DateTimes.of("2025-01-28T16:15:00Z")
+ );
+ final ReindexingTimelineView.ValidationError validationError = new ReindexingTimelineView.ValidationError(
+ "INVALID_GRANULARITY_TIMELINE",
+ "Granularity must decrease over time",
+ "2024-12-01/2025-01-01",
+ "MONTH",
+ "2025-01-01/2025-01-29",
+ "DAY"
+ );
+
+ final ReindexingTimelineView.IntervalConfig skippedInterval = new ReindexingTimelineView.IntervalConfig(
+ Intervals.of("2024-12-01/2025-01-01"),
+ 0,
+ null,
+ Collections.emptyList()
+ );
+ final ReindexingTimelineView.IntervalConfig activeInterval = new ReindexingTimelineView.IntervalConfig(
+ Intervals.of("2025-01-01/2025-01-29"),
+ 1,
+ InlineSchemaDataSourceCompactionConfig.builder().forDataSource("testDS").build(),
+ List.of(new ReindexingPartitioningRule(
+ "day-rule",
+ null,
+ Period.days(7),
+ Granularities.DAY,
+ new DynamicPartitionsSpec(null, null),
+ null
+ ))
+ );
+
+ final ReindexingTimelineView original = new ReindexingTimelineView(
+ "testDS",
+ referenceTime,
+ skipOffset,
+ List.of(skippedInterval, activeInterval),
+ validationError
+ );
+
+ final String json = OBJECT_MAPPER.writeValueAsString(original);
+ final ReindexingTimelineView deserialized = OBJECT_MAPPER.readValue(json, ReindexingTimelineView.class);
+
+ Assertions.assertEquals(original, deserialized);
+ }
+
+ @Test
+ public void test_fromPlan_projectsIntervalsByDisposition()
+ {
+ final DateTime referenceTime = DateTimes.of("2025-01-29T00:00:00Z");
+
+ final ReindexingPartitioningRule partitioningRule = new ReindexingPartitioningRule(
+ "day-rule",
+ null,
+ Period.days(7),
+ Granularities.DAY,
+ new DynamicPartitionsSpec(null, null),
+ null
+ );
+
+ final Interval includedRange = Intervals.of("2024-12-01/2025-01-01");
+ final Interval skippedBoundary = Intervals.of("2025-01-01/2025-01-29");
+ final Interval skippedNoData = Intervals.of("2024-01-01/2024-12-01");
+
+ final ReindexingPlan plan = new ReindexingPlan(
+ "testDS",
+ referenceTime,
+ null,
+ List.of(
+ new ReindexingPlan.PlannedInterval(
+ skippedNoData,
+ skippedNoData,
+ ReindexingPlan.IntervalDisposition.SKIPPED_NO_DATA,
+ partitioningRule,
+ false,
+ Granularities.DAY,
+ 0,
+ null,
+ Collections.emptyList()
+ ),
+ new ReindexingPlan.PlannedInterval(
+ includedRange,
+ includedRange,
+ ReindexingPlan.IntervalDisposition.INCLUDED,
+ partitioningRule,
+ false,
+ Granularities.DAY,
+ 1,
+ InlineSchemaDataSourceCompactionConfig.builder().forDataSource("testDS").build(),
+ List.of((ReindexingRule) partitioningRule)
+ ),
+ new ReindexingPlan.PlannedInterval(
+ skippedBoundary,
+ skippedBoundary,
+ ReindexingPlan.IntervalDisposition.SKIPPED_BEYOND_BOUNDARY,
+ partitioningRule,
+ false,
+ Granularities.DAY,
+ 0,
+ null,
+ Collections.emptyList()
+ )
+ ),
+ null
+ );
+
+ final ReindexingTimelineView view = ReindexingTimelineView.fromPlan(plan);
+
+ // SKIPPED_NO_DATA should be omitted; INCLUDED and SKIPPED_BEYOND_BOUNDARY should be present.
+ Assertions.assertEquals(2, view.getIntervals().size());
+ Assertions.assertEquals(includedRange, view.getIntervals().get(0).getInterval());
+ Assertions.assertEquals(1, view.getIntervals().get(0).getRuleCount());
+ Assertions.assertNotNull(view.getIntervals().get(0).getConfig());
+
+ Assertions.assertEquals(skippedBoundary, view.getIntervals().get(1).getInterval());
+ Assertions.assertEquals(0, view.getIntervals().get(1).getRuleCount());
+ Assertions.assertNull(view.getIntervals().get(1).getConfig());
+ }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index 4ccf4659994f..005710503e39 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -26,6 +26,8 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.audit.AuditManager;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.indexing.compact.CompactionScheduler;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
@@ -109,6 +111,9 @@ public class SupervisorResourceTest extends EasyMockSupport
@Mock
private AuditManager auditManager;
+ @Mock
+ private CompactionScheduler compactionScheduler;
+
private SupervisorResource supervisorResource;
@Before
@@ -145,7 +150,8 @@ public Authorizer getAuthorizer(String name)
},
OBJECT_MAPPER,
authConfig,
- auditManager
+ auditManager,
+ compactionScheduler
);
}
@@ -204,9 +210,9 @@ public List getDataSources()
@Override
public void validateSpec()
{
- throw org.apache.druid.error.DruidException
- .forPersona(org.apache.druid.error.DruidException.Persona.USER)
- .ofCategory(org.apache.druid.error.DruidException.Category.INVALID_INPUT)
+ throw DruidException
+ .forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
.build("nope");
}
};
@@ -1434,6 +1440,90 @@ public void testSpecPostMergeCarriesForwardEvenWhenExistingHasOnlyTaskCountMin()
Assert.assertEquals(1, newSpec.getIoConfig().getTaskCount());
}
+ @Test
+ public void test_getReindexingTimeline_returnsTimelineFromScheduler()
+ {
+ final org.apache.druid.indexing.compact.ReindexingTimelineView view =
+ new org.apache.druid.indexing.compact.ReindexingTimelineView(
+ "datasource1",
+ org.apache.druid.java.util.common.DateTimes.of("2025-01-29T00:00:00Z"),
+ null,
+ java.util.Collections.emptyList(),
+ null
+ );
+
+ EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+ EasyMock.expect(supervisorManager.getSupervisorSpec("autocompact__datasource1"))
+ .andReturn(Optional.of(new TestSupervisorSpec("autocompact__datasource1", null, null)));
+ EasyMock.expect(compactionScheduler.previewReindexingTimeline(EasyMock.anyObject(), EasyMock.anyObject()))
+ .andReturn(view);
+ replayAll();
+
+ final Response response = supervisorResource.getReindexingTimeline("autocompact__datasource1", null);
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertSame(view, response.getEntity());
+
+ verifyAll();
+ }
+
+ @Test
+ public void test_getReindexingTimeline_supervisorNotFound_returns404()
+ {
+ EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+ EasyMock.expect(supervisorManager.getSupervisorSpec("missing-id")).andReturn(Optional.absent());
+ replayAll();
+
+ final Response response = supervisorResource.getReindexingTimeline("missing-id", null);
+ Assert.assertEquals(404, response.getStatus());
+ verifyAll();
+ }
+
+ @Test
+ public void test_getReindexingTimeline_invalidReferenceTime_returns400()
+ {
+ replayAll();
+
+ final Response response = supervisorResource.getReindexingTimeline("any-id", "not-a-date");
+ Assert.assertEquals(400, response.getStatus());
+ @SuppressWarnings("unchecked")
+ Map entity = (Map) response.getEntity();
+ Assert.assertTrue(entity.get("error").toString().contains("invalid format"));
+
+ verifyAll();
+ }
+
+ @Test
+ public void test_getReindexingTimeline_schedulerRejectsSpec_returns400()
+ {
+ EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+ EasyMock.expect(supervisorManager.getSupervisorSpec("non-compaction"))
+ .andReturn(Optional.of(new TestSupervisorSpec("non-compaction", null, null)));
+ EasyMock.expect(compactionScheduler.previewReindexingTimeline(EasyMock.anyObject(), EasyMock.anyObject()))
+ .andThrow(DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build("not a compaction supervisor"));
+ replayAll();
+
+ final Response response = supervisorResource.getReindexingTimeline("non-compaction", null);
+ Assert.assertEquals(400, response.getStatus());
+ @SuppressWarnings("unchecked")
+ Map entity = (Map) response.getEntity();
+ Assert.assertTrue(entity.get("error").toString().contains("not a compaction supervisor"));
+
+ verifyAll();
+ }
+
+ @Test
+ public void test_getReindexingTimeline_serviceUnavailable_returns503()
+ {
+ EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
+ replayAll();
+
+ final Response response = supervisorResource.getReindexingTimeline("any-id", null);
+ Assert.assertEquals(503, response.getStatus());
+ verifyAll();
+ }
+
@Test
public void test_handoffTaskGroups_returnsAccepted()
{