diff --git a/docs/api-reference/supervisor-api.md b/docs/api-reference/supervisor-api.md index d321af143020..6a67ffb442ab 100644 --- a/docs/api-reference/supervisor-api.md +++ b/docs/api-reference/supervisor-api.md @@ -1521,6 +1521,138 @@ In the example below, the outer key (`"0"`) is a **task group ID**; the inner ke **For automation clients:** If you need to map task IDs to group IDs (for handoff, draining, or observability), use the `/stats` response keys directly instead of re-deriving group IDs from partition data. This avoids coupling to internal supervisor assignment logic. +### Get reindexing timeline for a supervisor + +Returns the reindexing timeline for a compaction supervisor that uses a cascading reindexing template. The timeline shows how reindexing rules apply across time intervals, including skip-offset resolution and per-interval configuration. + +:::info +This endpoint is only available for compaction supervisors that use a cascading reindexing template. The Overlord fetches the live segment timeline for the datasource and applies any configured `skipOffsetFromNow` or `skipOffsetFromLatest` so the response reflects what would actually be compacted on the next scheduler run. +::: + +#### URL + +`GET` `/druid/indexer/v1/supervisor/{supervisorId}/reindexingTimeline` + +#### Query parameters + +* `referenceTime` (optional) + * Type: String (ISO-8601 datetime) + * The reference time to use for computing the timeline. Defaults to the current UTC time if not specified. + +#### Responses + + + + + + +*Successfully retrieved reindexing timeline* + + + + + +*Supervisor is not a compaction supervisor, does not use a cascading reindexing template, or the `referenceTime` parameter is not a valid ISO-8601 datetime* + + + + + +*Invalid supervisor ID* + + + + +--- + +#### Sample request + +The following example retrieves the reindexing timeline for a compaction supervisor on the datasource `my_datasource`. Compaction supervisor IDs follow the pattern `autocompact__`. + + + + + + +```shell +curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/autocompact__my_datasource/reindexingTimeline?referenceTime=2024-01-15T00:00:00.000Z" +``` + + + + + +```HTTP +GET /druid/indexer/v1/supervisor/autocompact__my_datasource/reindexingTimeline?referenceTime=2024-01-15T00:00:00.000Z HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +``` + + + + +#### Sample response + +
+ View the response + + ```json + { + "dataSource": "my_datasource", + "referenceTime": "2024-01-15T00:00:00.000Z", + "skipOffset": { + "type": "skipOffsetFromNow", + "period": "P1D", + "effectiveEndTime": "2024-01-14T00:00:00.000Z" + }, + "intervals": [ + { + "interval": "2024-01-14T00:00:00.000Z/2024-01-15T00:00:00.000Z", + "ruleCount": 0, + "appliedRules": [] + }, + { + "interval": "2024-01-01T00:00:00.000Z/2024-01-14T00:00:00.000Z", + "ruleCount": 2, + "config": { + "dataSource": "my_datasource", + "taskPriority": 25, + "inputSegmentSizeBytes": 100000000000, + "maxRowsPerSegment": 5000000, + "skipOffsetFromLatest": "PT0S", + "tuningConfig": null, + "taskContext": null, + "granularitySpec": { + "segmentGranularity": "DAY", + "queryGranularity": "HOUR", + "rollup": true + }, + "ioConfig": null + }, + "appliedRules": [ + { + "type": "partitioning", + "id": "compact-to-daily", + "description": "Compact to daily segments for data older than 1 day", + "olderThan": "P1D", + "segmentGranularity": "DAY" + }, + { + "type": "dataSchema", + "id": "rollup-hourly", + "description": "Roll up to hourly query granularity for data older than 1 day", + "olderThan": "P1D", + "queryGranularity": "HOUR", + "rollup": true + } + ] + } + ] + } + ``` +
+ +If the supervisor's datasource has no segments yet, the response includes an empty `intervals` list rather than the configured rule timeline — the rule timeline has no data to be applied against. + ## Audit history An audit history provides a comprehensive log of events, including supervisor configuration, creation, suspension, and modification history. diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java index 34ea243bbbc7..ddbdf2a64d94 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java @@ -19,6 +19,7 @@ package org.apache.druid.testing.embedded.compact; +import com.fasterxml.jackson.core.type.TypeReference; import org.apache.druid.catalog.guice.CatalogClientModule; import org.apache.druid.catalog.guice.CatalogCoordinatorModule; import org.apache.druid.common.utils.IdUtils; @@ -41,6 +42,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.compact.CascadingReindexingTemplate; import org.apache.druid.indexing.compact.CompactionSupervisorSpec; +import org.apache.druid.indexing.compact.ReindexingTimelineView; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; @@ -56,6 +58,7 @@ import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.filter.EqualityFilter; import org.apache.druid.query.filter.NotDimFilter; +import org.apache.druid.rpc.RequestBuilder; import org.apache.druid.rpc.UpdateResponse; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.VirtualColumns; @@ -102,6 +105,7 @@ import org.hamcrest.Matcher; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; +import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Period; @@ -443,6 +447,98 @@ public void test_cascadingCompactionTemplate_multiplePeriodsApplyDifferentCompac verifyEventCountOlderThan(Period.days(7), "item", "hat", 0); } + @Test + public void test_getReindexingTimeline_returnsTimelineForCascadingSupervisor() + { + final ReindexingPartitioningRule partitioningRule = new ReindexingPartitioningRule( + "dayRule", + "Compact to DAY granularity for data older than 7 days", + Period.days(7), + Granularities.DAY, + new DynamicPartitionsSpec(null, null), + null + ); + + final CascadingReindexingTemplate template = new CascadingReindexingTemplate( + dataSource, + null, + null, + InlineReindexingRuleProvider.builder() + .partitioningRules(List.of(partitioningRule)) + .build(), + null, + null, + null, + Granularities.HOUR, + new DynamicPartitionsSpec(null, null), + null, + null + ); + runCompactionWithSpec(template); + + final String supervisorId = CompactionSupervisorSpec.getSupervisorIdForDatasource(dataSource); + final DateTime referenceTime = DateTimes.nowUtc(); + final String url = StringUtils.format( + "/druid/indexer/v1/supervisor/%s/reindexingTimeline?referenceTime=%s", + StringUtils.urlEncode(supervisorId), + StringUtils.urlEncode(referenceTime.toString()) + ); + + final ReindexingTimelineView timeline = cluster.callApi().serviceClient().onLeaderOverlord( + mapper -> new RequestBuilder(HttpMethod.GET, url), + new TypeReference<>() {} + ); + + Assertions.assertNotNull(timeline); + Assertions.assertEquals(dataSource, timeline.getDataSource()); + Assertions.assertEquals(referenceTime, timeline.getReferenceTime()); + Assertions.assertNull(timeline.getValidationError(), "Timeline should have no validation errors"); + } + + @Test + public void test_getReindexingTimeline_returns400ForNonCascadingSupervisor() + { + final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord( + o -> o.updateClusterCompactionConfig( + new ClusterCompactionConfig(1.0, 100, null, true, CompactionEngine.MSQ, true) + ) + ); + Assertions.assertTrue(updateResponse.isSuccess()); + + final InlineSchemaDataSourceCompactionConfig inlineConfig = + InlineSchemaDataSourceCompactionConfig + .builder() + .forDataSource(dataSource) + .withSkipOffsetFromLatest(Period.seconds(0)) + .withGranularitySpec( + new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null) + ) + .withTuningConfig( + UserCompactionTaskQueryTuningConfig.builder().partitionsSpec(new DynamicPartitionsSpec(null, null)).build() + ) + .build(); + + runCompactionWithSpec(inlineConfig); + + final String supervisorId = CompactionSupervisorSpec.getSupervisorIdForDatasource(dataSource); + final String url = StringUtils.format( + "/druid/indexer/v1/supervisor/%s/reindexingTimeline", + StringUtils.urlEncode(supervisorId) + ); + + final RuntimeException exception = Assertions.assertThrows( + RuntimeException.class, + () -> cluster.callApi().serviceClient().onLeaderOverlord( + mapper -> new RequestBuilder(HttpMethod.GET, url), + new TypeReference() {} + ) + ); + Assertions.assertTrue( + exception.getMessage().contains("400 Bad Request"), + "Expected 400 Bad Request in error message but got: " + exception.getMessage() + ); + } + @Test public void test_cascadingReindexing_withVirtualColumnOnNestedData_filtersCorrectly() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java index c50c7ec74025..7dab8a1fc674 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java @@ -299,26 +299,14 @@ public CompactionConfigValidationResult validate(ClusterCompactionConfig cluster .orElse(CompactionConfigValidationResult.success()); } - /** - * Checks if the given interval's end time is after the specified boundary. - * Used to determine if intervals should be skipped based on skip offset configuration. - * - * @param interval the interval to check - * @param boundary the boundary time to compare against - * @return true if the interval ends after the boundary - */ - private static boolean intervalEndsAfter(Interval interval, DateTime boundary) - { - return interval.getEnd().isAfter(boundary); - } - @Override public List createCompactionJobs( DruidInputSource source, CompactionJobParams jobParams ) { - // Check if the rule provider is ready before attempting to create jobs + + // Abort if rule provider is not ready yet (e.g., if it's backed by a cache that hasn't finished warming). if (!ruleProvider.isReady()) { LOG.info( "Rule provider [%s] is not ready, skipping reindexing job creation for dataSource[%s]", @@ -328,86 +316,32 @@ public List createCompactionJobs( return Collections.emptyList(); } - final List allJobs = new ArrayList<>(); - final DateTime currentTime = jobParams.getScheduleStartTime(); - - SegmentTimeline timeline = jobParams.getTimeline(dataSource); - + final SegmentTimeline timeline = jobParams.getTimeline(dataSource); if (timeline == null || timeline.isEmpty()) { LOG.warn("Segment timeline null or empty for [%s] skipping creating compaction jobs.", dataSource); return Collections.emptyList(); } - List searchIntervals = generateAlignedSearchIntervals(currentTime); - if (searchIntervals.isEmpty()) { - LOG.warn("No search intervals generated for dataSource[%s], no reindexing jobs will be created", dataSource); - return Collections.emptyList(); - } + final ReindexingPlan plan = new ReindexingPlanner(this).plan(jobParams.getScheduleStartTime(), timeline); - // Adjust timeline interval by applying user defined skip offset (if any exists) - Interval adjustedTimelineInterval = applySkipOffset( - new Interval(timeline.first().getInterval().getStart(), timeline.last().getInterval().getEnd()), - jobParams.getScheduleStartTime() - ); - if (adjustedTimelineInterval == null) { - LOG.warn("All data for dataSource[%s] is within skip offsets, no reindexing jobs will be created", dataSource); - return Collections.emptyList(); - } - - for (int i = 0; i < searchIntervals.size(); i++) { - IntervalPartitioningInfo intervalInfo = searchIntervals.get(i); - Interval reindexingInterval = intervalInfo.getInterval(); - - if (!reindexingInterval.overlaps(adjustedTimelineInterval)) { - // No underlying data exists to reindex for this interval - LOG.debug("Search interval[%s] does not overlap with data range[%s], skipping", reindexingInterval, adjustedTimelineInterval); + final List allJobs = new ArrayList<>(); + for (ReindexingPlan.PlannedInterval planned : plan.getIntervals()) { + if (!planned.isJobEligible()) { continue; } - - // Skip offsets, if configured, can result in needing to truncate a search interval. If the truncation makes the interval invalid, skip it. - if ((skipOffsetFromNow != null || skipOffsetFromLatest != null) && - intervalEndsAfter(reindexingInterval, adjustedTimelineInterval.getEnd())) { - - DateTime alignedEnd = intervalInfo.getGranularity().bucketStart(adjustedTimelineInterval.getEnd()); - if (!alignedEnd.isAfter(reindexingInterval.getStart())) { - LOG.debug("Search interval[%s] is entirely within skip offset, skipping", reindexingInterval); - continue; - } - reindexingInterval = new Interval(reindexingInterval.getStart(), alignedEnd); - // Replace the entry in searchIntervals so the downstream synthetic-timeline lookup - // in ReindexingConfigBuilder matches the truncated interval. - intervalInfo = new IntervalPartitioningInfo( - reindexingInterval, - intervalInfo.getSourceRule(), - intervalInfo.isRuleSynthetic() - ); - searchIntervals.set(i, intervalInfo); - } - - InlineSchemaDataSourceCompactionConfig.Builder builder = createBaseBuilder(); - - ReindexingConfigBuilder configBuilder = new ReindexingConfigBuilder( - ruleProvider, - reindexingInterval, - currentTime, - searchIntervals, - tuningConfig + LOG.debug( + "Creating reindexing jobs for interval[%s] with [%d] rules selected", + planned.getInterval(), + planned.getRuleCount() + ); + allJobs.addAll( + createJobsForSearchInterval( + createJobTemplateForInterval(planned.getConfig()), + planned.getInterval(), + source, + jobParams + ) ); - int ruleCount = configBuilder.applyTo(builder); - - if (ruleCount > 0) { - LOG.debug("Creating reindexing jobs for interval[%s] with [%d] rules selected", reindexingInterval, ruleCount); - allJobs.addAll( - createJobsForSearchInterval( - createJobTemplateForInterval(builder.build()), - reindexingInterval, - source, - jobParams - ) - ); - } else { - LOG.debug("No applicable reindexing rules found for interval[%s]", reindexingInterval); - } } return allJobs; } @@ -421,34 +355,35 @@ protected CompactionJobTemplate createJobTemplateForInterval( } /** - * Applies the configured skip offset to an interval by adjusting its end time. Uses either - * skipOffsetFromNow (relative to reference time) or skipOffsetFromLatest (relative to interval end). - * Returns null if the adjusted end would be before the interval start. - * - * @param interval the interval to adjust - * @param skipFromNowReferenceTime the reference time for skipOffsetFromNow calculation - * @return the interval with adjusted end time, or null if the result would be invalid + * Builds a configuration view of this template's rule timeline against the given reference time and + * live segment timeline. Used by the API endpoint that exposes the reindexing timeline to operators. + * Callers must short-circuit before invoking this when {@code timeline} is null or empty. */ + ReindexingTimelineView buildTimelineView(DateTime referenceTime, SegmentTimeline timeline) + { + return ReindexingTimelineView.fromPlan(new ReindexingPlanner(this).plan(referenceTime, timeline)); + } + + // ── Accessors used by ReindexingPlanner. Package-private to keep this surface internal. ── + + ReindexingRuleProvider getReindexingRuleProvider() + { + return ruleProvider; + } + @Nullable - private Interval applySkipOffset( - Interval interval, - DateTime skipFromNowReferenceTime - ) + Period getSkipOffsetFromNowOrNull() { - DateTime maybeAdjustedEnd = interval.getEnd(); - if (skipOffsetFromNow != null) { - maybeAdjustedEnd = skipFromNowReferenceTime.minus(skipOffsetFromNow); - } else if (skipOffsetFromLatest != null) { - maybeAdjustedEnd = maybeAdjustedEnd.minus(skipOffsetFromLatest); - } - if (maybeAdjustedEnd.isBefore(interval.getStart())) { - return null; - } else { - return new Interval(interval.getStart(), maybeAdjustedEnd); - } + return skipOffsetFromNow; + } + + @Nullable + Period getSkipOffsetFromLatestOrNull() + { + return skipOffsetFromLatest; } - private InlineSchemaDataSourceCompactionConfig.Builder createBaseBuilder() + InlineSchemaDataSourceCompactionConfig.Builder createBaseConfigBuilder() { return InlineSchemaDataSourceCompactionConfig .builder() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionScheduler.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionScheduler.java index 6f5ed1a7a6ef..f7a0affd1ab0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionScheduler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionScheduler.java @@ -19,12 +19,14 @@ package org.apache.druid.indexing.compact; +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.server.compaction.CompactionSimulateResult; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.ClusterCompactionConfig; import org.apache.druid.server.coordinator.CompactionConfigValidationResult; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DruidCompactionConfig; +import org.joda.time.DateTime; import java.util.Map; @@ -86,4 +88,16 @@ public interface CompactionScheduler */ CompactionSimulateResult simulateRunWithConfigUpdate(ClusterCompactionConfig updateRequest); + /** + * Builds a preview of the reindexing timeline for a compaction supervisor whose template is a + * {@link CascadingReindexingTemplate}. The preview reflects exactly what the next scheduler run + * would compact: it resolves rules against the live segment timeline and applies any configured + * skip offset. Returns an empty intervals list when no segments exist for the datasource yet. + * + * @param spec supervisor spec; must be a {@code CompactionSupervisorSpec} carrying a + * {@code CascadingReindexingTemplate}, otherwise a {@code DruidException} of + * category {@code INVALID_INPUT} is thrown + * @param referenceTime time to evaluate rule periods against + */ + ReindexingTimelineView previewReindexingTimeline(SupervisorSpec spec, DateTime referenceTime); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java index c2a0dabb65ad..b987fdf97c30 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java @@ -34,6 +34,7 @@ import org.apache.druid.indexing.overlord.TaskQueryTool; import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunnerListener; +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; @@ -59,6 +60,8 @@ import org.apache.druid.server.coordinator.stats.CoordinatorStat; import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.coordinator.stats.Stats; +import org.apache.druid.timeline.SegmentTimeline; +import org.joda.time.DateTime; import java.util.Collections; import java.util.List; @@ -499,6 +502,42 @@ public CompactionSimulateResult simulateRunWithConfigUpdate(ClusterCompactionCon } } + @Override + public ReindexingTimelineView previewReindexingTimeline(SupervisorSpec spec, DateTime referenceTime) + { + if (!(spec instanceof CompactionSupervisorSpec)) { + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build( + "Supervisor[%s] is not a compaction supervisor", + spec.getId() + ); + } + final CompactionSupervisorSpec compactionSpec = (CompactionSupervisorSpec) spec; + final CompactionJobTemplate template = compactionSpec.getTemplate(); + if (!(template instanceof CascadingReindexingTemplate)) { + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build( + "Reindexing timeline is not available for supervisor[%s] as it does not use a cascading reindexing template", + spec.getId() + ); + } + final CascadingReindexingTemplate cascadingTemplate = (CascadingReindexingTemplate) template; + final String dataSource = cascadingTemplate.getDataSource(); + final SegmentTimeline timeline = getDatasourceSnapshot() + .getUsedSegmentsTimelinesPerDataSource() + .get(dataSource); + + // No data yet for this datasource — returning configured intervals would be misleading + // since there's nothing the rules would actually be applied against. + if (timeline == null || timeline.isEmpty()) { + return new ReindexingTimelineView(dataSource, referenceTime, null, Collections.emptyList(), null); + } + + return cascadingTemplate.buildTimelineView(referenceTime, timeline); + } + private void emitStat(CoordinatorStat stat, RowKey rowKey, long value) { if (!stat.shouldEmit()) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/ReindexingPlan.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/ReindexingPlan.java new file mode 100644 index 000000000000..97616b6270f4 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/ReindexingPlan.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.compact; + +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.server.compaction.ReindexingPartitioningRule; +import org.apache.druid.server.compaction.ReindexingRule; +import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; + +/** + * Internal description of a cascading reindexing plan: the search intervals and, for each, + * the rules that apply and the resolved compaction config. Produced once by {@link ReindexingPlanner} + * and walked by both {@link CascadingReindexingTemplate#createCompactionJobs} (which emits jobs) + * and {@link ReindexingTimelineView#fromPlan} (which formats for the API). Package-private — + * this is not part of any public contract. + */ +final class ReindexingPlan +{ + private final String dataSource; + private final DateTime referenceTime; + @Nullable + private final SkipOffsetResolution skipOffset; + private final List intervals; + @Nullable + private final PlanValidationError validationError; + + ReindexingPlan( + String dataSource, + DateTime referenceTime, + @Nullable SkipOffsetResolution skipOffset, + List intervals, + @Nullable PlanValidationError validationError + ) + { + this.dataSource = dataSource; + this.referenceTime = referenceTime; + this.skipOffset = skipOffset; + this.intervals = Collections.unmodifiableList(intervals); + this.validationError = validationError; + } + + String getDataSource() + { + return dataSource; + } + + DateTime getReferenceTime() + { + return referenceTime; + } + + @Nullable + SkipOffsetResolution getSkipOffset() + { + return skipOffset; + } + + List getIntervals() + { + return intervals; + } + + @Nullable + PlanValidationError getValidationError() + { + return validationError; + } + + /** + * How a search interval was resolved against data overlap and skip-offset boundaries. + * Drives whether job creation emits a job for the interval and how the API view renders it. + */ + enum IntervalDisposition + { + /** Interval is fully within the compactable range; emit a job for the original interval. */ + INCLUDED, + /** Interval extended past a skip-offset or data boundary and was clipped; emit a job for the clipped range. */ + TRUNCATED, + /** Interval lies entirely past the skip-offset boundary (no data eligible); no job. */ + SKIPPED_BEYOND_BOUNDARY, + /** Interval has no overlap with the live segment timeline; no job. */ + SKIPPED_NO_DATA + } + + static final class PlannedInterval + { + private final Interval interval; + private final Interval originalInterval; + private final IntervalDisposition disposition; + private final ReindexingPartitioningRule sourceRule; + private final boolean ruleSynthetic; + private final Granularity segmentGranularity; + private final int ruleCount; + @Nullable + private final InlineSchemaDataSourceCompactionConfig config; + private final List appliedRules; + + PlannedInterval( + Interval interval, + Interval originalInterval, + IntervalDisposition disposition, + ReindexingPartitioningRule sourceRule, + boolean ruleSynthetic, + Granularity segmentGranularity, + int ruleCount, + @Nullable InlineSchemaDataSourceCompactionConfig config, + List appliedRules + ) + { + this.interval = interval; + this.originalInterval = originalInterval; + this.disposition = disposition; + this.sourceRule = sourceRule; + this.ruleSynthetic = ruleSynthetic; + this.segmentGranularity = segmentGranularity; + this.ruleCount = ruleCount; + this.config = config; + this.appliedRules = Collections.unmodifiableList(appliedRules); + } + + /** Interval after any skip-offset/data-boundary truncation. This is what a compaction job would target. */ + Interval getInterval() + { + return interval; + } + + /** Interval as originally generated from the rule timeline, before any truncation. */ + Interval getOriginalInterval() + { + return originalInterval; + } + + IntervalDisposition getDisposition() + { + return disposition; + } + + ReindexingPartitioningRule getSourceRule() + { + return sourceRule; + } + + boolean isRuleSynthetic() + { + return ruleSynthetic; + } + + Granularity getSegmentGranularity() + { + return segmentGranularity; + } + + int getRuleCount() + { + return ruleCount; + } + + @Nullable + InlineSchemaDataSourceCompactionConfig getConfig() + { + return config; + } + + List getAppliedRules() + { + return appliedRules; + } + + /** True if a compaction job should be emitted for this interval (rules resolved and not skipped). */ + boolean isJobEligible() + { + return config != null + && (disposition == IntervalDisposition.INCLUDED || disposition == IntervalDisposition.TRUNCATED); + } + } + + /** + * The configured skip offset resolved against the live segment timeline. Present only when + * a skip offset is configured on the template. Always fully resolved — there is no + * "configured but not applied" state in the plan, because the planner requires a live timeline. + */ + static final class SkipOffsetResolution + { + enum Type + { + FROM_NOW, + FROM_LATEST + } + + private final Type type; + private final Period period; + private final DateTime effectiveEndTime; + + SkipOffsetResolution(Type type, Period period, DateTime effectiveEndTime) + { + this.type = type; + this.period = period; + this.effectiveEndTime = effectiveEndTime; + } + + Type getType() + { + return type; + } + + Period getPeriod() + { + return period; + } + + DateTime getEffectiveEndTime() + { + return effectiveEndTime; + } + } + + /** + * Captured when planning fails before any intervals can be produced (e.g., misconfigured + * granularity ordering, no rules). The plan still carries the error so callers can surface + * it as part of the response rather than as an exception. + */ + static final class PlanValidationError + { + enum Type + { + INVALID_GRANULARITY_TIMELINE, + VALIDATION_ERROR + } + + private final Type type; + private final String message; + @Nullable + private final Interval olderInterval; + @Nullable + private final Granularity olderGranularity; + @Nullable + private final Interval newerInterval; + @Nullable + private final Granularity newerGranularity; + + PlanValidationError(Type type, String message) + { + this(type, message, null, null, null, null); + } + + PlanValidationError( + Type type, + String message, + @Nullable Interval olderInterval, + @Nullable Granularity olderGranularity, + @Nullable Interval newerInterval, + @Nullable Granularity newerGranularity + ) + { + this.type = type; + this.message = message; + this.olderInterval = olderInterval; + this.olderGranularity = olderGranularity; + this.newerInterval = newerInterval; + this.newerGranularity = newerGranularity; + } + + Type getType() + { + return type; + } + + String getMessage() + { + return message; + } + + @Nullable + Interval getOlderInterval() + { + return olderInterval; + } + + @Nullable + Granularity getOlderGranularity() + { + return olderGranularity; + } + + @Nullable + Interval getNewerInterval() + { + return newerInterval; + } + + @Nullable + Granularity getNewerGranularity() + { + return newerGranularity; + } + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/ReindexingPlanner.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/ReindexingPlanner.java new file mode 100644 index 000000000000..aae5bd85e46c --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/ReindexingPlanner.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.compact; + +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.compaction.IntervalPartitioningInfo; +import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig; +import org.apache.druid.timeline.SegmentTimeline; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Builds a {@link ReindexingPlan} from a {@link CascadingReindexingTemplate} for a given reference + * time and the live segment timeline. The plan is the single source of truth for both + * compaction-job creation and the API timeline view, ensuring the two cannot drift. + * + *

The data range bounds the planned intervals (no-overlap intervals become + * {@link ReindexingPlan.IntervalDisposition#SKIPPED_NO_DATA}), {@code skipOffsetFromLatest} is + * anchored on the live data, and intervals that extend past the skip boundary are truncated or + * marked {@link ReindexingPlan.IntervalDisposition#SKIPPED_BEYOND_BOUNDARY}. + * + *

Validation failures (misconfigured granularity ordering, no rules) are caught and surfaced + * on the returned plan as a {@link ReindexingPlan.PlanValidationError} so HTTP callers can render + * them in the response rather than as 500s. + */ +final class ReindexingPlanner +{ + private static final Logger LOG = new Logger(ReindexingPlanner.class); + + private final CascadingReindexingTemplate template; + + ReindexingPlanner(CascadingReindexingTemplate template) + { + this.template = template; + } + + /** + * Build a plan for the given reference time. The plan reflects exactly what + * {@link CascadingReindexingTemplate#createCompactionJobs} would do given the same + * {@code timeline} and reference time. + * + * @param timeline live segment timeline for the datasource; must be non-null and non-empty — + * callers are responsible for short-circuiting when no data exists rather than + * calling this with a fabricated empty timeline. + */ + ReindexingPlan plan(DateTime referenceTime, SegmentTimeline timeline) + { + final String dataSource = template.getDataSource(); + + if (!template.getReindexingRuleProvider().isReady()) { + LOG.info( + "Rule provider [%s] is not ready for dataSource[%s], returning empty plan", + template.getReindexingRuleProvider().getType(), + dataSource + ); + return new ReindexingPlan(dataSource, referenceTime, null, Collections.emptyList(), null); + } + + final List searchIntervals; + try { + searchIntervals = template.generateAlignedSearchIntervals(referenceTime); + } + catch (SegmentGranularityTimelineValidationException e) { + LOG.warn(e, "Granularity timeline validation failed for dataSource[%s]", dataSource); + return new ReindexingPlan( + dataSource, + referenceTime, + null, + Collections.emptyList(), + new ReindexingPlan.PlanValidationError( + ReindexingPlan.PlanValidationError.Type.INVALID_GRANULARITY_TIMELINE, + e.getMessage(), + e.getOlderInterval(), + e.getOlderGranularity(), + e.getNewerInterval(), + e.getNewerGranularity() + ) + ); + } + catch (IAE e) { + LOG.warn(e, "Validation failed while planning timeline for dataSource[%s]", dataSource); + return new ReindexingPlan( + dataSource, + referenceTime, + null, + Collections.emptyList(), + new ReindexingPlan.PlanValidationError( + ReindexingPlan.PlanValidationError.Type.VALIDATION_ERROR, + e.getMessage() + ) + ); + } + + if (searchIntervals.isEmpty()) { + LOG.debug("No search intervals generated for dataSource[%s]", dataSource); + return new ReindexingPlan(dataSource, referenceTime, null, Collections.emptyList(), null); + } + + final ReindexingPlan.SkipOffsetResolution skipOffsetResolution = buildSkipOffsetResolution(timeline, referenceTime); + final Interval dataRangeWithSkipOffset = computeDataRangeWithSkipOffset(timeline, referenceTime); + if (dataRangeWithSkipOffset == null) { + LOG.debug("All data for dataSource[%s] is within skip offsets; no intervals will be planned", dataSource); + return new ReindexingPlan(dataSource, referenceTime, skipOffsetResolution, Collections.emptyList(), null); + } + + final List planned = new ArrayList<>(searchIntervals.size()); + for (int i = 0; i < searchIntervals.size(); i++) { + final IntervalPartitioningInfo originalInfo = searchIntervals.get(i); + final Interval originalInterval = originalInfo.getInterval(); + + if (!originalInterval.overlaps(dataRangeWithSkipOffset)) { + planned.add(noRuleEntry(originalInterval, originalInterval, ReindexingPlan.IntervalDisposition.SKIPPED_NO_DATA, originalInfo)); + continue; + } + + // When a skip offset is configured, intervals extending past the adjusted data-range end + // must be truncated (or skipped entirely if the truncation eats the whole interval). + final DateTime truncationBoundary = + (template.getSkipOffsetFromNowOrNull() != null || template.getSkipOffsetFromLatestOrNull() != null) + ? dataRangeWithSkipOffset.getEnd() + : null; + + Interval effectiveInterval = originalInterval; + ReindexingPlan.IntervalDisposition disposition = ReindexingPlan.IntervalDisposition.INCLUDED; + + if (truncationBoundary != null && originalInterval.getEnd().isAfter(truncationBoundary)) { + final DateTime alignedEnd = originalInfo.getGranularity().bucketStart(truncationBoundary); + if (!alignedEnd.isAfter(originalInterval.getStart())) { + planned.add(noRuleEntry(originalInterval, originalInterval, ReindexingPlan.IntervalDisposition.SKIPPED_BEYOND_BOUNDARY, originalInfo)); + continue; + } + effectiveInterval = new Interval(originalInterval.getStart(), alignedEnd); + disposition = ReindexingPlan.IntervalDisposition.TRUNCATED; + // Replace the entry so the synthetic-timeline lookup in ReindexingConfigBuilder matches the truncated interval. + searchIntervals.set( + i, + new IntervalPartitioningInfo(effectiveInterval, originalInfo.getSourceRule(), originalInfo.isRuleSynthetic()) + ); + } + + final InlineSchemaDataSourceCompactionConfig.Builder builder = template.createBaseConfigBuilder(); + final ReindexingConfigBuilder configBuilder = new ReindexingConfigBuilder( + template.getReindexingRuleProvider(), + effectiveInterval, + referenceTime, + searchIntervals, + template.getTuningConfig() + ); + final ReindexingConfigBuilder.BuildResult buildResult = configBuilder.applyToWithDetails(builder); + + if (buildResult.getRuleCount() > 0) { + planned.add(new ReindexingPlan.PlannedInterval( + effectiveInterval, + originalInterval, + disposition, + originalInfo.getSourceRule(), + originalInfo.isRuleSynthetic(), + originalInfo.getGranularity(), + buildResult.getRuleCount(), + builder.build(), + buildResult.getAppliedRules() + )); + } else { + // Interval is geometrically valid but no rules apply to it. We retain INCLUDED/TRUNCATED disposition + // (the geometry didn't skip it) but mark config null so it won't generate a job and the view can + // choose to hide it. + planned.add(new ReindexingPlan.PlannedInterval( + effectiveInterval, + originalInterval, + disposition, + originalInfo.getSourceRule(), + originalInfo.isRuleSynthetic(), + originalInfo.getGranularity(), + 0, + null, + Collections.emptyList() + )); + } + } + + return new ReindexingPlan(dataSource, referenceTime, skipOffsetResolution, planned, null); + } + + /** + * Computes the data-range interval clipped by skip offsets. Returns null when the entire data + * range falls within skip offsets and nothing remains to plan. + */ + @Nullable + private Interval computeDataRangeWithSkipOffset(SegmentTimeline timeline, DateTime referenceTime) + { + final Interval dataBounds = new Interval( + timeline.first().getInterval().getStart(), + timeline.last().getInterval().getEnd() + ); + + final Period skipFromNow = template.getSkipOffsetFromNowOrNull(); + final Period skipFromLatest = template.getSkipOffsetFromLatestOrNull(); + + DateTime end = dataBounds.getEnd(); + if (skipFromNow != null) { + end = referenceTime.minus(skipFromNow); + } else if (skipFromLatest != null) { + end = end.minus(skipFromLatest); + } + if (end.isBefore(dataBounds.getStart())) { + return null; + } + return new Interval(dataBounds.getStart(), end); + } + + /** + * Builds the skip-offset resolution stamp for the plan. Returns null when no skip offset is configured. + */ + @Nullable + private ReindexingPlan.SkipOffsetResolution buildSkipOffsetResolution(SegmentTimeline timeline, DateTime referenceTime) + { + final Period skipFromNow = template.getSkipOffsetFromNowOrNull(); + if (skipFromNow != null) { + return new ReindexingPlan.SkipOffsetResolution( + ReindexingPlan.SkipOffsetResolution.Type.FROM_NOW, + skipFromNow, + referenceTime.minus(skipFromNow) + ); + } + final Period skipFromLatest = template.getSkipOffsetFromLatestOrNull(); + if (skipFromLatest != null) { + return new ReindexingPlan.SkipOffsetResolution( + ReindexingPlan.SkipOffsetResolution.Type.FROM_LATEST, + skipFromLatest, + timeline.last().getInterval().getEnd().minus(skipFromLatest) + ); + } + return null; + } + + private static ReindexingPlan.PlannedInterval noRuleEntry( + Interval interval, + Interval originalInterval, + ReindexingPlan.IntervalDisposition disposition, + IntervalPartitioningInfo info + ) + { + return new ReindexingPlan.PlannedInterval( + interval, + originalInterval, + disposition, + info.getSourceRule(), + info.isRuleSynthetic(), + info.getGranularity(), + 0, + null, + Collections.emptyList() + ); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/ReindexingTimelineView.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/ReindexingTimelineView.java new file mode 100644 index 000000000000..936a9ae56d90 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/ReindexingTimelineView.java @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.compact; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.server.compaction.ReindexingDataSchemaRule; +import org.apache.druid.server.compaction.ReindexingDeletionRule; +import org.apache.druid.server.compaction.ReindexingIndexSpecRule; +import org.apache.druid.server.compaction.ReindexingPartitioningRule; +import org.apache.druid.server.compaction.ReindexingRule; +import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.joda.time.Period; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * Public API DTO describing the timeline of search intervals and their associated reindexing + * configurations for a cascading reindexing supervisor. Surfaced through the + * {@code /supervisor/{id}/reindexingTimeline} endpoint so operators (and the web console) + * can visualize how rules are applied across time. + * + *

Always produced via {@link #fromPlan(ReindexingPlan)} so the API view is a pure projection + * of the same plan that drives job creation — the two cannot drift. + */ +public class ReindexingTimelineView +{ + private final String dataSource; + private final DateTime referenceTime; + @Nullable + private final SkipOffsetInfo skipOffset; + private final List intervals; + @Nullable + private final ValidationError validationError; + + @JsonCreator + public ReindexingTimelineView( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("referenceTime") DateTime referenceTime, + @JsonProperty("skipOffset") @Nullable SkipOffsetInfo skipOffset, + @JsonProperty("intervals") List intervals, + @JsonProperty("validationError") @Nullable ValidationError validationError + ) + { + this.dataSource = dataSource; + this.referenceTime = referenceTime; + this.skipOffset = skipOffset; + this.intervals = Collections.unmodifiableList(intervals); + this.validationError = validationError; + } + + /** + * Projects a {@link ReindexingPlan} into the API view. Intervals that have rules applied + * are emitted with their resolved config; intervals fully eclipsed by a skip-offset boundary + * are emitted with zero rules so the UI can render the skipped span. Intervals with no data + * overlap (a job-path concern) are not surfaced — operators see only ranges that would + * actually be compacted or that are explicitly skipped. + */ + static ReindexingTimelineView fromPlan(ReindexingPlan plan) + { + final List intervalConfigs = new ArrayList<>(); + for (ReindexingPlan.PlannedInterval planned : plan.getIntervals()) { + switch (planned.getDisposition()) { + case INCLUDED: + case TRUNCATED: + if (planned.getRuleCount() > 0) { + intervalConfigs.add(new IntervalConfig( + planned.getInterval(), + planned.getRuleCount(), + planned.getConfig(), + planned.getAppliedRules() + )); + } + break; + case SKIPPED_BEYOND_BOUNDARY: + intervalConfigs.add(new IntervalConfig( + planned.getInterval(), + 0, + null, + Collections.emptyList() + )); + break; + case SKIPPED_NO_DATA: + // Operator-visible timeline only shows ranges that would be compacted or are + // explicitly skipped by configuration. Ranges with no underlying data are omitted. + break; + } + } + + return new ReindexingTimelineView( + plan.getDataSource(), + plan.getReferenceTime(), + SkipOffsetInfo.fromPlan(plan.getSkipOffset()), + intervalConfigs, + ValidationError.fromPlan(plan.getValidationError()) + ); + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public DateTime getReferenceTime() + { + return referenceTime; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + public SkipOffsetInfo getSkipOffset() + { + return skipOffset; + } + + @JsonProperty + public List getIntervals() + { + return intervals; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + public ValidationError getValidationError() + { + return validationError; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ReindexingTimelineView that = (ReindexingTimelineView) o; + return Objects.equals(dataSource, that.dataSource) + && Objects.equals(referenceTime, that.referenceTime) + && Objects.equals(skipOffset, that.skipOffset) + && Objects.equals(intervals, that.intervals) + && Objects.equals(validationError, that.validationError); + } + + @Override + public int hashCode() + { + return Objects.hash(dataSource, referenceTime, skipOffset, intervals, validationError); + } + + /** + * Describes a granularity-ordering or configuration error encountered while building the timeline. + * Surfaced in the response body rather than as a 5xx so clients can render meaningful messages. + */ + public static class ValidationError + { + private final String errorType; + private final String message; + @Nullable + private final String olderInterval; + @Nullable + private final String olderGranularity; + @Nullable + private final String newerInterval; + @Nullable + private final String newerGranularity; + + @JsonCreator + public ValidationError( + @JsonProperty("errorType") String errorType, + @JsonProperty("message") String message, + @JsonProperty("olderInterval") @Nullable String olderInterval, + @JsonProperty("olderGranularity") @Nullable String olderGranularity, + @JsonProperty("newerInterval") @Nullable String newerInterval, + @JsonProperty("newerGranularity") @Nullable String newerGranularity + ) + { + this.errorType = errorType; + this.message = message; + this.olderInterval = olderInterval; + this.olderGranularity = olderGranularity; + this.newerInterval = newerInterval; + this.newerGranularity = newerGranularity; + } + + @Nullable + static ValidationError fromPlan(@Nullable ReindexingPlan.PlanValidationError error) + { + if (error == null) { + return null; + } + return new ValidationError( + error.getType().name(), + error.getMessage(), + error.getOlderInterval() == null ? null : error.getOlderInterval().toString(), + error.getOlderGranularity() == null ? null : error.getOlderGranularity().toString(), + error.getNewerInterval() == null ? null : error.getNewerInterval().toString(), + error.getNewerGranularity() == null ? null : error.getNewerGranularity().toString() + ); + } + + @JsonProperty + public String getErrorType() + { + return errorType; + } + + @JsonProperty + public String getMessage() + { + return message; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + public String getOlderInterval() + { + return olderInterval; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + public String getOlderGranularity() + { + return olderGranularity; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + public String getNewerInterval() + { + return newerInterval; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + public String getNewerGranularity() + { + return newerGranularity; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ValidationError that = (ValidationError) o; + return Objects.equals(errorType, that.errorType) + && Objects.equals(message, that.message) + && Objects.equals(olderInterval, that.olderInterval) + && Objects.equals(olderGranularity, that.olderGranularity) + && Objects.equals(newerInterval, that.newerInterval) + && Objects.equals(newerGranularity, that.newerGranularity); + } + + @Override + public int hashCode() + { + return Objects.hash(errorType, message, olderInterval, olderGranularity, newerInterval, newerGranularity); + } + } + + /** + * Describes the configured skip offset that was applied to this view. Only present in the + * response when the configured skip offset was actually resolved against the live segment + * timeline — operators don't see a "configured but not applied" state because the scheduler + * short-circuits to an empty timeline when no segments exist. + */ + public static class SkipOffsetInfo + { + private final String type; + private final Period period; + private final DateTime effectiveEndTime; + + @JsonCreator + public SkipOffsetInfo( + @JsonProperty("type") String type, + @JsonProperty("period") Period period, + @JsonProperty("effectiveEndTime") DateTime effectiveEndTime + ) + { + this.type = type; + this.period = period; + this.effectiveEndTime = effectiveEndTime; + } + + @Nullable + static SkipOffsetInfo fromPlan(@Nullable ReindexingPlan.SkipOffsetResolution resolution) + { + if (resolution == null) { + return null; + } + final String typeName = resolution.getType() == ReindexingPlan.SkipOffsetResolution.Type.FROM_NOW + ? "skipOffsetFromNow" + : "skipOffsetFromLatest"; + return new SkipOffsetInfo(typeName, resolution.getPeriod(), resolution.getEffectiveEndTime()); + } + + @JsonProperty + public String getType() + { + return type; + } + + @JsonProperty + public Period getPeriod() + { + return period; + } + + @JsonProperty + public DateTime getEffectiveEndTime() + { + return effectiveEndTime; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SkipOffsetInfo that = (SkipOffsetInfo) o; + return Objects.equals(type, that.type) + && Objects.equals(period, that.period) + && Objects.equals(effectiveEndTime, that.effectiveEndTime); + } + + @Override + public int hashCode() + { + return Objects.hash(type, period, effectiveEndTime); + } + } + + /** + * A single search interval and its resolved reindexing configuration. {@code ruleCount==0} + * with a null {@code config} indicates a span that was eclipsed by the skip-offset boundary + * (no compaction would occur in that span). + */ + public static class IntervalConfig + { + private final Interval interval; + private final int ruleCount; + @Nullable + private final DataSourceCompactionConfig config; + private final List appliedRules; + + @JsonCreator + public IntervalConfig( + @JsonProperty("interval") Interval interval, + @JsonProperty("ruleCount") int ruleCount, + @JsonProperty("config") @Nullable DataSourceCompactionConfig config, + @JsonProperty("appliedRules") List appliedRules + ) + { + this.interval = interval; + this.ruleCount = ruleCount; + this.config = config; + this.appliedRules = Collections.unmodifiableList(appliedRules); + } + + @JsonProperty + public Interval getInterval() + { + return interval; + } + + @JsonProperty + public int getRuleCount() + { + return ruleCount; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + public DataSourceCompactionConfig getConfig() + { + return config; + } + + @JsonProperty + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") + @JsonSubTypes({ + @JsonSubTypes.Type(value = ReindexingDataSchemaRule.class, name = "dataSchema"), + @JsonSubTypes.Type(value = ReindexingDeletionRule.class, name = "deletion"), + @JsonSubTypes.Type(value = ReindexingPartitioningRule.class, name = "partitioning"), + @JsonSubTypes.Type(value = ReindexingIndexSpecRule.class, name = "indexSpec"), + }) + public List getAppliedRules() + { + return appliedRules; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IntervalConfig that = (IntervalConfig) o; + return ruleCount == that.ruleCount + && Objects.equals(interval, that.interval) + && Objects.equals(config, that.config) + && Objects.equals(appliedRules, that.appliedRules); + } + + @Override + public int hashCode() + { + return Objects.hash(interval, ruleCount, config, appliedRules); + } + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index aff9edf19af9..c45d631b6c4d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -36,9 +36,12 @@ import org.apache.druid.audit.AuditEntry; import org.apache.druid.audit.AuditManager; import org.apache.druid.error.DruidException; +import org.apache.druid.indexing.compact.CompactionScheduler; +import org.apache.druid.indexing.compact.ReindexingTimelineView; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.http.security.SupervisorResourceFilter; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.UOE; import org.apache.druid.segment.incremental.ParseExceptionReport; @@ -52,6 +55,7 @@ import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceType; import org.apache.druid.utils.CollectionUtils; +import org.joda.time.DateTime; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -100,6 +104,7 @@ public class SupervisorResource private final ObjectMapper objectMapper; private final AuditManager auditManager; private final AuthConfig authConfig; + private final CompactionScheduler compactionScheduler; @Inject public SupervisorResource( @@ -107,7 +112,8 @@ public SupervisorResource( AuthorizerMapper authorizerMapper, ObjectMapper objectMapper, AuthConfig authConfig, - AuditManager auditManager + AuditManager auditManager, + CompactionScheduler compactionScheduler ) { this.taskMaster = taskMaster; @@ -115,6 +121,7 @@ public SupervisorResource( this.objectMapper = objectMapper; this.authConfig = authConfig; this.auditManager = auditManager; + this.compactionScheduler = compactionScheduler; } @POST @@ -362,6 +369,68 @@ public Response getAllTaskStats( ); } + @GET + @Path("/{id}/reindexingTimeline") + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(SupervisorResourceFilter.class) + public Response getReindexingTimeline( + @PathParam("id") final String id, + @QueryParam("referenceTime") @Nullable final String referenceTimeStr + ) + { + final DateTime referenceTime; + try { + referenceTime = referenceTimeStr == null ? DateTimes.nowUtc() : DateTimes.of(referenceTimeStr); + } + catch (IllegalArgumentException e) { + return Response.status(Response.Status.BAD_REQUEST) + .entity(ImmutableMap.of( + "error", + StringUtils.format( + "Reference time[%s] is in invalid format. Use ISO 8601 standard format.", + referenceTimeStr + ) + )) + .build(); + } + + return asLeaderWithSupervisorManager( + manager -> { + final Optional specOptional = manager.getSupervisorSpec(id); + if (!specOptional.isPresent()) { + return Response.status(Response.Status.NOT_FOUND) + .entity(ImmutableMap.of("error", StringUtils.format("Supervisor[%s] does not exist", id))) + .build(); + } + try { + final ReindexingTimelineView view = compactionScheduler.previewReindexingTimeline( + specOptional.get(), + referenceTime + ); + return Response.ok(view).build(); + } + catch (DruidException e) { + return Response.status(httpStatusFor(e)) + .entity(ImmutableMap.of("error", e.getMessage())) + .build(); + } + } + ); + } + + private static Response.Status httpStatusFor(DruidException e) + { + switch (e.getCategory()) { + case NOT_FOUND: + return Response.Status.NOT_FOUND; + case INVALID_INPUT: + case UNSUPPORTED: + return Response.Status.BAD_REQUEST; + default: + return Response.Status.INTERNAL_SERVER_ERROR; + } + } + @GET @Path("/{id}/parseErrors") @Produces(MediaType.APPLICATION_JSON) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java index c3325725f16f..ba17494887d4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java @@ -1961,7 +1961,7 @@ private ReindexingRuleProvider createMockProvider(List periods) } ReindexingRuleProvider mockProvider = EasyMock.createMock(ReindexingRuleProvider.class); - EasyMock.expect(mockProvider.isReady()).andReturn(true); + EasyMock.expect(mockProvider.isReady()).andReturn(true).anyTimes(); EasyMock.expect(mockProvider.getPartitioningRules()).andReturn(partitioningRules).anyTimes(); // Return a fresh stream on each call to avoid "stream has already been operated upon or closed" errors EasyMock.expect(mockProvider.streamAllRules()).andAnswer(() -> partitioningRules.stream().map(r -> (ReindexingRule) r)).anyTimes(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/compact/ReindexingPlannerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/compact/ReindexingPlannerTest.java new file mode 100644 index 000000000000..d5464c8d9312 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/compact/ReindexingPlannerTest.java @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.compact; + +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.filter.EqualityFilter; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.server.compaction.InlineReindexingRuleProvider; +import org.apache.druid.server.compaction.ReindexingDataSchemaRule; +import org.apache.druid.server.compaction.ReindexingDeletionRule; +import org.apache.druid.server.compaction.ReindexingPartitioningRule; +import org.apache.druid.server.compaction.ReindexingRule; +import org.apache.druid.server.compaction.ReindexingRuleProvider; +import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig; +import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentTimeline; +import org.easymock.EasyMock; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.joda.time.Period; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; + +/** + * Unit tests for {@link ReindexingPlanner}. The planner requires a non-null, non-empty + * {@link SegmentTimeline} — callers short-circuit on missing data before invoking it — so every + * test passes a constructed timeline that covers the rule range under test. + */ +public class ReindexingPlannerTest extends InitializedNullHandlingTest +{ + private static final DateTime REFERENCE_TIME = DateTimes.of("2025-02-01T00:00:00Z"); + // A timeline that covers everything the rules in these tests will look at. + private static final SegmentTimeline WIDE_TIMELINE = createTimeline( + DateTimes.of("2024-01-01T00:00:00Z"), + DateTimes.of("2025-01-31T23:00:00Z") + ); + + @Test + public void test_plan_comprehensive() + { + final ReindexingPartitioningRule partitioning7d = new ReindexingPartitioningRule( + "seg-gran-7d", + null, + Period.days(7), + Granularities.HOUR, + new DynamicPartitionsSpec(5000000, null), + null + ); + final ReindexingPartitioningRule partitioning30d = new ReindexingPartitioningRule( + "seg-gran-30d", + null, + Period.days(30), + Granularities.DAY, + new DynamicPartitionsSpec(5000000, null), + null + ); + final ReindexingDataSchemaRule dataSchema15d = new ReindexingDataSchemaRule( + "data-schema-15d", + null, + Period.days(15), + new UserCompactionTaskDimensionsConfig(null), + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + Granularities.MINUTE, + true, + null + ); + final ReindexingDeletionRule deletion10d = new ReindexingDeletionRule( + "deletion-10d", + null, + Period.days(10), + new EqualityFilter("country", ColumnType.STRING, "US", null), + null + ); + final ReindexingDeletionRule deletion20d = new ReindexingDeletionRule( + "deletion-20d", + null, + Period.days(20), + new EqualityFilter("device", ColumnType.STRING, "mobile", null), + null + ); + + final ReindexingRuleProvider provider = InlineReindexingRuleProvider + .builder() + .partitioningRules(List.of(partitioning7d, partitioning30d)) + .dataSchemaRules(List.of(dataSchema15d)) + .deletionRules(List.of(deletion10d, deletion20d)) + .build(); + + final CascadingReindexingTemplate template = newTemplate(provider, null); + final ReindexingPlan plan = new ReindexingPlanner(template).plan(REFERENCE_TIME, WIDE_TIMELINE); + + Assertions.assertEquals("testDS", plan.getDataSource()); + Assertions.assertEquals(REFERENCE_TIME, plan.getReferenceTime()); + Assertions.assertNull(plan.getValidationError()); + Assertions.assertNull(plan.getSkipOffset()); + Assertions.assertTrue(plan.getIntervals().size() >= 2, "Expected at least 2 intervals"); + + boolean foundPartitioning = false; + boolean foundDataSchema = false; + boolean foundDeletion = false; + + for (ReindexingPlan.PlannedInterval planned : plan.getIntervals()) { + Assertions.assertNotNull(planned.getInterval()); + if (planned.getRuleCount() == 0) { + continue; // SKIPPED_NO_DATA or no rules apply — outside the comprehensive coverage check. + } + Assertions.assertNotNull(planned.getConfig()); + Assertions.assertEquals(planned.getRuleCount(), planned.getAppliedRules().size()); + Assertions.assertEquals( + ReindexingPlan.IntervalDisposition.INCLUDED, + planned.getDisposition() + ); + + for (ReindexingRule rule : planned.getAppliedRules()) { + if (rule instanceof ReindexingPartitioningRule) { + foundPartitioning = true; + } else if (rule instanceof ReindexingDataSchemaRule) { + foundDataSchema = true; + } else if (rule instanceof ReindexingDeletionRule) { + foundDeletion = true; + } + } + } + + Assertions.assertTrue(foundPartitioning, "Plan should contain a segmentGranularity rule"); + Assertions.assertTrue(foundDataSchema, "Plan should contain a dataSchema rule"); + Assertions.assertTrue(foundDeletion, "Plan should contain a deletion rule"); + } + + @Test + public void test_plan_propagatesTemplateTuningConfig() + { + final ReindexingRuleProvider provider = InlineReindexingRuleProvider + .builder() + .partitioningRules(List.of( + new ReindexingPartitioningRule( + "seg-7d", + null, + Period.days(7), + Granularities.HOUR, + new DynamicPartitionsSpec(5000000, null), + null + ) + )) + .build(); + + final UserCompactionTaskQueryTuningConfig templateTuningConfig = UserCompactionTaskQueryTuningConfig.builder() + .maxRowsInMemory(12345) + .maxNumConcurrentSubTasks(7) + .maxRetry(9) + .build(); + + final CascadingReindexingTemplate template = newTemplate(provider, templateTuningConfig); + final ReindexingPlan plan = new ReindexingPlanner(template).plan(REFERENCE_TIME, WIDE_TIMELINE); + + boolean sawConfig = false; + for (ReindexingPlan.PlannedInterval planned : plan.getIntervals()) { + if (planned.getConfig() == null) { + continue; + } + sawConfig = true; + final UserCompactionTaskQueryTuningConfig effective = planned.getConfig().getTuningConfig(); + Assertions.assertNotNull(effective); + Assertions.assertEquals(Integer.valueOf(12345), effective.getMaxRowsInMemory()); + Assertions.assertEquals(Integer.valueOf(7), effective.getMaxNumConcurrentSubTasks()); + Assertions.assertEquals(Integer.valueOf(9), effective.getMaxRetry()); + } + Assertions.assertTrue(sawConfig, "Expected at least one interval with a non-null config"); + } + + @Test + public void test_plan_skipOffsetFromNow_skipsBeyondBoundary() + { + final DateTime referenceTime = DateTimes.of("2025-01-29T00:00:00Z"); + final Period skipOffset = Period.days(10); + + final ReindexingRuleProvider provider = InlineReindexingRuleProvider + .builder() + .partitioningRules(List.of( + new ReindexingPartitioningRule("seg-3d", null, Period.days(3), Granularities.HOUR, new DynamicPartitionsSpec(5000000, null), null), + new ReindexingPartitioningRule("seg-30d", null, Period.days(30), Granularities.DAY, new DynamicPartitionsSpec(5000000, null), null) + )) + .build(); + + final CascadingReindexingTemplate template = new CascadingReindexingTemplate( + "testDS", + null, + null, + provider, + null, + null, + skipOffset, + Granularities.DAY, + new DynamicPartitionsSpec(5000000, null), + null, + null + ); + + final SegmentTimeline timeline = createTimeline( + DateTimes.of("2024-01-01T00:00:00Z"), + DateTimes.of("2025-01-28T23:00:00Z") + ); + final ReindexingPlan plan = new ReindexingPlanner(template).plan(referenceTime, timeline); + + Assertions.assertNotNull(plan.getSkipOffset()); + Assertions.assertEquals(ReindexingPlan.SkipOffsetResolution.Type.FROM_NOW, plan.getSkipOffset().getType()); + Assertions.assertEquals(skipOffset, plan.getSkipOffset().getPeriod()); + Assertions.assertEquals(referenceTime.minus(skipOffset), plan.getSkipOffset().getEffectiveEndTime()); + + for (ReindexingPlan.PlannedInterval planned : plan.getIntervals()) { + if (planned.getOriginalInterval().getEnd().isAfter(plan.getSkipOffset().getEffectiveEndTime())) { + Assertions.assertTrue( + planned.getDisposition() == ReindexingPlan.IntervalDisposition.TRUNCATED + || planned.getDisposition() == ReindexingPlan.IntervalDisposition.SKIPPED_BEYOND_BOUNDARY, + "Interval beyond skip-offset boundary should be TRUNCATED or SKIPPED_BEYOND_BOUNDARY" + ); + } + } + } + + @Test + public void test_plan_skipOffsetFromLatest_anchorsOnData() + { + final DateTime referenceTime = DateTimes.of("2025-01-29T00:00:00Z"); + final DateTime latestData = DateTimes.of("2025-01-28T12:00:00Z"); + + final ReindexingRuleProvider provider = InlineReindexingRuleProvider + .builder() + .partitioningRules(List.of( + new ReindexingPartitioningRule( + "seg-7d", + null, + Period.days(7), + Granularities.DAY, + new DynamicPartitionsSpec(5000000, null), + null + ) + )) + .build(); + + final CascadingReindexingTemplate template = new CascadingReindexingTemplate( + "testDS", + null, + null, + provider, + null, + Period.hours(6), + null, + Granularities.DAY, + new DynamicPartitionsSpec(5000000, null), + null, + null + ); + + final SegmentTimeline timeline = createTimeline( + DateTimes.of("2024-12-01T00:00:00Z"), + latestData + ); + + final ReindexingPlan plan = new ReindexingPlanner(template).plan(referenceTime, timeline); + + Assertions.assertNotNull(plan.getSkipOffset()); + Assertions.assertEquals(ReindexingPlan.SkipOffsetResolution.Type.FROM_LATEST, plan.getSkipOffset().getType()); + Assertions.assertEquals(latestData.minus(Period.hours(6)), plan.getSkipOffset().getEffectiveEndTime()); + } + + @Test + public void test_plan_validationError_invalidGranularityTimeline() + { + final DateTime referenceTime = DateTimes.of("2025-01-29T16:15:00Z"); + + final ReindexingRuleProvider provider = InlineReindexingRuleProvider + .builder() + .partitioningRules(List.of( + new ReindexingPartitioningRule("hour-rule", null, Period.days(30), Granularities.HOUR, new DynamicPartitionsSpec(5000000, null), null), + new ReindexingPartitioningRule("day-rule", null, Period.days(90), Granularities.DAY, new DynamicPartitionsSpec(5000000, null), null) + )) + .dataSchemaRules(List.of( + new ReindexingDataSchemaRule( + "metrics-7d", + null, + Period.days(7), + null, + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + null, + null, + null + ) + )) + .build(); + + final CascadingReindexingTemplate template = new CascadingReindexingTemplate( + "testDS", + null, + null, + provider, + null, + null, + null, + Granularities.MONTH, + new DynamicPartitionsSpec(5000000, null), + null, + null + ); + + final ReindexingPlan plan = new ReindexingPlanner(template).plan(referenceTime, WIDE_TIMELINE); + + Assertions.assertNotNull(plan.getValidationError()); + Assertions.assertEquals( + ReindexingPlan.PlanValidationError.Type.INVALID_GRANULARITY_TIMELINE, + plan.getValidationError().getType() + ); + Assertions.assertTrue(plan.getValidationError().getMessage().contains("Invalid segment granularity timeline")); + Assertions.assertNotNull(plan.getValidationError().getOlderInterval()); + Assertions.assertNotNull(plan.getValidationError().getOlderGranularity()); + Assertions.assertNotNull(plan.getValidationError().getNewerInterval()); + Assertions.assertNotNull(plan.getValidationError().getNewerGranularity()); + Assertions.assertTrue(plan.getIntervals().isEmpty(), "Intervals should be empty on validation error"); + } + + @Test + public void test_plan_ruleProviderNotReady_returnsEmptyPlan() + { + final ReindexingRuleProvider notReadyProvider = EasyMock.createMock(ReindexingRuleProvider.class); + EasyMock.expect(notReadyProvider.isReady()).andReturn(false).anyTimes(); + EasyMock.expect(notReadyProvider.getType()).andReturn("mock").anyTimes(); + EasyMock.replay(notReadyProvider); + + final CascadingReindexingTemplate template = newTemplate(notReadyProvider, null); + final ReindexingPlan plan = new ReindexingPlanner(template).plan(REFERENCE_TIME, WIDE_TIMELINE); + + Assertions.assertEquals("testDS", plan.getDataSource()); + Assertions.assertEquals(REFERENCE_TIME, plan.getReferenceTime()); + Assertions.assertTrue(plan.getIntervals().isEmpty()); + Assertions.assertNull(plan.getValidationError()); + Assertions.assertNull(plan.getSkipOffset()); + } + + @Test + public void test_plan_intervalsOutsideDataRangeAreSkipped() + { + final ReindexingRuleProvider provider = InlineReindexingRuleProvider + .builder() + .partitioningRules(List.of( + new ReindexingPartitioningRule( + "seg-7d", + null, + Period.days(7), + Granularities.DAY, + new DynamicPartitionsSpec(5000000, null), + null + ), + new ReindexingPartitioningRule( + "seg-90d", + null, + Period.days(90), + Granularities.DAY, + new DynamicPartitionsSpec(5000000, null), + null + ) + )) + .build(); + + final CascadingReindexingTemplate template = newTemplate(provider, null); + + // Timeline covers only a thin slice — most rule intervals will have no data overlap. + final SegmentTimeline timeline = createTimeline( + DateTimes.of("2024-12-01T00:00:00Z"), + DateTimes.of("2024-12-15T00:00:00Z") + ); + + final ReindexingPlan plan = new ReindexingPlanner(template).plan(REFERENCE_TIME, timeline); + + boolean sawSkippedNoData = false; + for (ReindexingPlan.PlannedInterval planned : plan.getIntervals()) { + if (planned.getDisposition() == ReindexingPlan.IntervalDisposition.SKIPPED_NO_DATA) { + sawSkippedNoData = true; + Assertions.assertNull(planned.getConfig()); + Assertions.assertEquals(0, planned.getRuleCount()); + Assertions.assertFalse(planned.isJobEligible()); + } + } + Assertions.assertTrue(sawSkippedNoData, "Expected at least one interval with disposition SKIPPED_NO_DATA"); + } + + private static CascadingReindexingTemplate newTemplate( + ReindexingRuleProvider provider, + UserCompactionTaskQueryTuningConfig tuningConfig + ) + { + return new CascadingReindexingTemplate( + "testDS", + null, + null, + provider, + null, + null, + null, + Granularities.DAY, + new DynamicPartitionsSpec(5000000, null), + null, + tuningConfig + ); + } + + private static SegmentTimeline createTimeline(DateTime start, DateTime end) + { + final DataSegment segment = DataSegment.builder() + .dataSource("testDS") + .interval(new Interval(start, end)) + .version("v1") + .size(1000) + .build(); + return SegmentTimeline.forSegments(Collections.singletonList(segment)); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/compact/ReindexingTimelineViewTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/compact/ReindexingTimelineViewTest.java new file mode 100644 index 000000000000..4475f552454d --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/compact/ReindexingTimelineViewTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.compact; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.guice.SupervisorModule; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.server.compaction.ReindexingPartitioningRule; +import org.apache.druid.server.compaction.ReindexingRule; +import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.joda.time.Period; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; + +public class ReindexingTimelineViewTest +{ + private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper(); + + @BeforeEach + public void setUp() + { + OBJECT_MAPPER.registerModules(new SupervisorModule().getJacksonModules()); + } + + @Test + public void test_serde() throws Exception + { + final DateTime referenceTime = DateTimes.of("2025-01-29T16:15:00Z"); + + final ReindexingTimelineView.SkipOffsetInfo skipOffset = new ReindexingTimelineView.SkipOffsetInfo( + "skipOffsetFromLatest", + Period.days(1), + DateTimes.of("2025-01-28T16:15:00Z") + ); + final ReindexingTimelineView.ValidationError validationError = new ReindexingTimelineView.ValidationError( + "INVALID_GRANULARITY_TIMELINE", + "Granularity must decrease over time", + "2024-12-01/2025-01-01", + "MONTH", + "2025-01-01/2025-01-29", + "DAY" + ); + + final ReindexingTimelineView.IntervalConfig skippedInterval = new ReindexingTimelineView.IntervalConfig( + Intervals.of("2024-12-01/2025-01-01"), + 0, + null, + Collections.emptyList() + ); + final ReindexingTimelineView.IntervalConfig activeInterval = new ReindexingTimelineView.IntervalConfig( + Intervals.of("2025-01-01/2025-01-29"), + 1, + InlineSchemaDataSourceCompactionConfig.builder().forDataSource("testDS").build(), + List.of(new ReindexingPartitioningRule( + "day-rule", + null, + Period.days(7), + Granularities.DAY, + new DynamicPartitionsSpec(null, null), + null + )) + ); + + final ReindexingTimelineView original = new ReindexingTimelineView( + "testDS", + referenceTime, + skipOffset, + List.of(skippedInterval, activeInterval), + validationError + ); + + final String json = OBJECT_MAPPER.writeValueAsString(original); + final ReindexingTimelineView deserialized = OBJECT_MAPPER.readValue(json, ReindexingTimelineView.class); + + Assertions.assertEquals(original, deserialized); + } + + @Test + public void test_fromPlan_projectsIntervalsByDisposition() + { + final DateTime referenceTime = DateTimes.of("2025-01-29T00:00:00Z"); + + final ReindexingPartitioningRule partitioningRule = new ReindexingPartitioningRule( + "day-rule", + null, + Period.days(7), + Granularities.DAY, + new DynamicPartitionsSpec(null, null), + null + ); + + final Interval includedRange = Intervals.of("2024-12-01/2025-01-01"); + final Interval skippedBoundary = Intervals.of("2025-01-01/2025-01-29"); + final Interval skippedNoData = Intervals.of("2024-01-01/2024-12-01"); + + final ReindexingPlan plan = new ReindexingPlan( + "testDS", + referenceTime, + null, + List.of( + new ReindexingPlan.PlannedInterval( + skippedNoData, + skippedNoData, + ReindexingPlan.IntervalDisposition.SKIPPED_NO_DATA, + partitioningRule, + false, + Granularities.DAY, + 0, + null, + Collections.emptyList() + ), + new ReindexingPlan.PlannedInterval( + includedRange, + includedRange, + ReindexingPlan.IntervalDisposition.INCLUDED, + partitioningRule, + false, + Granularities.DAY, + 1, + InlineSchemaDataSourceCompactionConfig.builder().forDataSource("testDS").build(), + List.of((ReindexingRule) partitioningRule) + ), + new ReindexingPlan.PlannedInterval( + skippedBoundary, + skippedBoundary, + ReindexingPlan.IntervalDisposition.SKIPPED_BEYOND_BOUNDARY, + partitioningRule, + false, + Granularities.DAY, + 0, + null, + Collections.emptyList() + ) + ), + null + ); + + final ReindexingTimelineView view = ReindexingTimelineView.fromPlan(plan); + + // SKIPPED_NO_DATA should be omitted; INCLUDED and SKIPPED_BEYOND_BOUNDARY should be present. + Assertions.assertEquals(2, view.getIntervals().size()); + Assertions.assertEquals(includedRange, view.getIntervals().get(0).getInterval()); + Assertions.assertEquals(1, view.getIntervals().get(0).getRuleCount()); + Assertions.assertNotNull(view.getIntervals().get(0).getConfig()); + + Assertions.assertEquals(skippedBoundary, view.getIntervals().get(1).getInterval()); + Assertions.assertEquals(0, view.getIntervals().get(1).getRuleCount()); + Assertions.assertNull(view.getIntervals().get(1).getConfig()); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index 4ccf4659994f..005710503e39 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -26,6 +26,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.audit.AuditManager; +import org.apache.druid.error.DruidException; +import org.apache.druid.indexing.compact.CompactionScheduler; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskMaster; @@ -109,6 +111,9 @@ public class SupervisorResourceTest extends EasyMockSupport @Mock private AuditManager auditManager; + @Mock + private CompactionScheduler compactionScheduler; + private SupervisorResource supervisorResource; @Before @@ -145,7 +150,8 @@ public Authorizer getAuthorizer(String name) }, OBJECT_MAPPER, authConfig, - auditManager + auditManager, + compactionScheduler ); } @@ -204,9 +210,9 @@ public List getDataSources() @Override public void validateSpec() { - throw org.apache.druid.error.DruidException - .forPersona(org.apache.druid.error.DruidException.Persona.USER) - .ofCategory(org.apache.druid.error.DruidException.Category.INVALID_INPUT) + throw DruidException + .forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) .build("nope"); } }; @@ -1434,6 +1440,90 @@ public void testSpecPostMergeCarriesForwardEvenWhenExistingHasOnlyTaskCountMin() Assert.assertEquals(1, newSpec.getIoConfig().getTaskCount()); } + @Test + public void test_getReindexingTimeline_returnsTimelineFromScheduler() + { + final org.apache.druid.indexing.compact.ReindexingTimelineView view = + new org.apache.druid.indexing.compact.ReindexingTimelineView( + "datasource1", + org.apache.druid.java.util.common.DateTimes.of("2025-01-29T00:00:00Z"), + null, + java.util.Collections.emptyList(), + null + ); + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorSpec("autocompact__datasource1")) + .andReturn(Optional.of(new TestSupervisorSpec("autocompact__datasource1", null, null))); + EasyMock.expect(compactionScheduler.previewReindexingTimeline(EasyMock.anyObject(), EasyMock.anyObject())) + .andReturn(view); + replayAll(); + + final Response response = supervisorResource.getReindexingTimeline("autocompact__datasource1", null); + Assert.assertEquals(200, response.getStatus()); + Assert.assertSame(view, response.getEntity()); + + verifyAll(); + } + + @Test + public void test_getReindexingTimeline_supervisorNotFound_returns404() + { + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorSpec("missing-id")).andReturn(Optional.absent()); + replayAll(); + + final Response response = supervisorResource.getReindexingTimeline("missing-id", null); + Assert.assertEquals(404, response.getStatus()); + verifyAll(); + } + + @Test + public void test_getReindexingTimeline_invalidReferenceTime_returns400() + { + replayAll(); + + final Response response = supervisorResource.getReindexingTimeline("any-id", "not-a-date"); + Assert.assertEquals(400, response.getStatus()); + @SuppressWarnings("unchecked") + Map entity = (Map) response.getEntity(); + Assert.assertTrue(entity.get("error").toString().contains("invalid format")); + + verifyAll(); + } + + @Test + public void test_getReindexingTimeline_schedulerRejectsSpec_returns400() + { + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorSpec("non-compaction")) + .andReturn(Optional.of(new TestSupervisorSpec("non-compaction", null, null))); + EasyMock.expect(compactionScheduler.previewReindexingTimeline(EasyMock.anyObject(), EasyMock.anyObject())) + .andThrow(DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build("not a compaction supervisor")); + replayAll(); + + final Response response = supervisorResource.getReindexingTimeline("non-compaction", null); + Assert.assertEquals(400, response.getStatus()); + @SuppressWarnings("unchecked") + Map entity = (Map) response.getEntity(); + Assert.assertTrue(entity.get("error").toString().contains("not a compaction supervisor")); + + verifyAll(); + } + + @Test + public void test_getReindexingTimeline_serviceUnavailable_returns503() + { + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()); + replayAll(); + + final Response response = supervisorResource.getReindexingTimeline("any-id", null); + Assert.assertEquals(503, response.getStatus()); + verifyAll(); + } + @Test public void test_handoffTaskGroups_returnsAccepted() {