Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions docs/api-reference/supervisor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1521,6 +1521,138 @@ In the example below, the outer key (`"0"`) is a **task group ID**; the inner ke

**For automation clients:** If you need to map task IDs to group IDs (for handoff, draining, or observability), use the `/stats` response keys directly instead of re-deriving group IDs from partition data. This avoids coupling to internal supervisor assignment logic.

### Get reindexing timeline for a supervisor

Returns the reindexing timeline for a compaction supervisor that uses a cascading reindexing template. The timeline shows how reindexing rules apply across time intervals, including skip-offset resolution and per-interval configuration.

:::info
This endpoint is only available for compaction supervisors that use a cascading reindexing template. The Overlord fetches the live segment timeline for the datasource and applies any configured `skipOffsetFromNow` or `skipOffsetFromLatest` so the response reflects what would actually be compacted on the next scheduler run.
:::

#### URL

`GET` `/druid/indexer/v1/supervisor/{supervisorId}/reindexingTimeline`

#### Query parameters

* `referenceTime` (optional)
* Type: String (ISO-8601 datetime)
* The reference time to use for computing the timeline. Defaults to the current UTC time if not specified.

#### Responses

<Tabs>

<TabItem value="57" label="200 SUCCESS">


*Successfully retrieved reindexing timeline*

</TabItem>
<TabItem value="58" label="400 BAD REQUEST">


*Supervisor is not a compaction supervisor, does not use a cascading reindexing template, or the `referenceTime` parameter is not a valid ISO-8601 datetime*

</TabItem>
<TabItem value="59" label="404 NOT FOUND">


*Invalid supervisor ID*

</TabItem>
</Tabs>

---

#### Sample request

The following example retrieves the reindexing timeline for a compaction supervisor on the datasource `my_datasource`. Compaction supervisor IDs follow the pattern `autocompact__<datasource>`.

<Tabs>

<TabItem value="60" label="cURL">


```shell
curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/autocompact__my_datasource/reindexingTimeline?referenceTime=2024-01-15T00:00:00.000Z"
```

</TabItem>
<TabItem value="61" label="HTTP">


```HTTP
GET /druid/indexer/v1/supervisor/autocompact__my_datasource/reindexingTimeline?referenceTime=2024-01-15T00:00:00.000Z HTTP/1.1
Host: http://ROUTER_IP:ROUTER_PORT
```

</TabItem>
</Tabs>

#### Sample response

<details>
<summary>View the response</summary>

```json
{
"dataSource": "my_datasource",
"referenceTime": "2024-01-15T00:00:00.000Z",
"skipOffset": {
"type": "skipOffsetFromNow",
"period": "P1D",
"effectiveEndTime": "2024-01-14T00:00:00.000Z"
},
"intervals": [
{
"interval": "2024-01-14T00:00:00.000Z/2024-01-15T00:00:00.000Z",
"ruleCount": 0,
"appliedRules": []
},
{
"interval": "2024-01-01T00:00:00.000Z/2024-01-14T00:00:00.000Z",
"ruleCount": 2,
"config": {
"dataSource": "my_datasource",
"taskPriority": 25,
"inputSegmentSizeBytes": 100000000000,
"maxRowsPerSegment": 5000000,
"skipOffsetFromLatest": "PT0S",
"tuningConfig": null,
"taskContext": null,
"granularitySpec": {
"segmentGranularity": "DAY",
"queryGranularity": "HOUR",
"rollup": true
},
"ioConfig": null
},
"appliedRules": [
{
"type": "partitioning",
"id": "compact-to-daily",
"description": "Compact to daily segments for data older than 1 day",
"olderThan": "P1D",
"segmentGranularity": "DAY"
},
{
"type": "dataSchema",
"id": "rollup-hourly",
"description": "Roll up to hourly query granularity for data older than 1 day",
"olderThan": "P1D",
"queryGranularity": "HOUR",
"rollup": true
}
]
}
]
}
```
</details>

If the supervisor's datasource has no segments yet, the response includes an empty `intervals` list rather than the configured rule timeline — the rule timeline has no data to be applied against.

## Audit history

An audit history provides a comprehensive log of events, including supervisor configuration, creation, suspension, and modification history.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.testing.embedded.compact;

import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.catalog.guice.CatalogClientModule;
import org.apache.druid.catalog.guice.CatalogCoordinatorModule;
import org.apache.druid.common.utils.IdUtils;
Expand All @@ -41,6 +42,7 @@
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.compact.CascadingReindexingTemplate;
import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
import org.apache.druid.indexing.compact.ReindexingTimelineView;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
Expand All @@ -56,6 +58,7 @@
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.EqualityFilter;
import org.apache.druid.query.filter.NotDimFilter;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.UpdateResponse;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.VirtualColumns;
Expand Down Expand Up @@ -102,6 +105,7 @@
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
Expand Down Expand Up @@ -443,6 +447,98 @@ public void test_cascadingCompactionTemplate_multiplePeriodsApplyDifferentCompac
verifyEventCountOlderThan(Period.days(7), "item", "hat", 0);
}

@Test
public void test_getReindexingTimeline_returnsTimelineForCascadingSupervisor()
{
final ReindexingPartitioningRule partitioningRule = new ReindexingPartitioningRule(
"dayRule",
"Compact to DAY granularity for data older than 7 days",
Period.days(7),
Granularities.DAY,
new DynamicPartitionsSpec(null, null),
null
);

final CascadingReindexingTemplate template = new CascadingReindexingTemplate(
dataSource,
null,
null,
InlineReindexingRuleProvider.builder()
.partitioningRules(List.of(partitioningRule))
.build(),
null,
null,
null,
Granularities.HOUR,
new DynamicPartitionsSpec(null, null),
null,
null
);
runCompactionWithSpec(template);

final String supervisorId = CompactionSupervisorSpec.getSupervisorIdForDatasource(dataSource);
final DateTime referenceTime = DateTimes.nowUtc();
final String url = StringUtils.format(
"/druid/indexer/v1/supervisor/%s/reindexingTimeline?referenceTime=%s",
StringUtils.urlEncode(supervisorId),
StringUtils.urlEncode(referenceTime.toString())
);

final ReindexingTimelineView timeline = cluster.callApi().serviceClient().onLeaderOverlord(
mapper -> new RequestBuilder(HttpMethod.GET, url),
new TypeReference<>() {}
);

Assertions.assertNotNull(timeline);
Assertions.assertEquals(dataSource, timeline.getDataSource());
Assertions.assertEquals(referenceTime, timeline.getReferenceTime());
Assertions.assertNull(timeline.getValidationError(), "Timeline should have no validation errors");
}

@Test
public void test_getReindexingTimeline_returns400ForNonCascadingSupervisor()
{
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(
new ClusterCompactionConfig(1.0, 100, null, true, CompactionEngine.MSQ, true)
)
);
Assertions.assertTrue(updateResponse.isSuccess());

final InlineSchemaDataSourceCompactionConfig inlineConfig =
InlineSchemaDataSourceCompactionConfig
.builder()
.forDataSource(dataSource)
.withSkipOffsetFromLatest(Period.seconds(0))
.withGranularitySpec(
new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null)
)
.withTuningConfig(
UserCompactionTaskQueryTuningConfig.builder().partitionsSpec(new DynamicPartitionsSpec(null, null)).build()
)
.build();

runCompactionWithSpec(inlineConfig);

final String supervisorId = CompactionSupervisorSpec.getSupervisorIdForDatasource(dataSource);
final String url = StringUtils.format(
"/druid/indexer/v1/supervisor/%s/reindexingTimeline",
StringUtils.urlEncode(supervisorId)
);

final RuntimeException exception = Assertions.assertThrows(
RuntimeException.class,
() -> cluster.callApi().serviceClient().onLeaderOverlord(
mapper -> new RequestBuilder(HttpMethod.GET, url),
new TypeReference<ReindexingTimelineView>() {}
)
);
Assertions.assertTrue(
exception.getMessage().contains("400 Bad Request"),
"Expected 400 Bad Request in error message but got: " + exception.getMessage()
);
}

@Test
public void test_cascadingReindexing_withVirtualColumnOnNestedData_filtersCorrectly()
{
Expand Down
Loading
Loading