Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/api-reference/legacy-metadata-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ Returns the percentage of segments actually loaded in the cluster versus segment

Returns the number of segments left to load until segments that should be loaded in the cluster are available for queries. This does not include segment replication counts.

Pass the optional query parameter `strictTierAwareSegmentLoad` with no value, or with value `true`, to make the default and `simple` responses consider a segment loaded only when at least one Historical replica is loaded in every tier with a positive replica count in the matching retention rule. Without this parameter, or with value `false`, a segment is considered loaded once any Historical replica is loaded.

`GET /druid/coordinator/v1/loadstatus?full`

Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available. This includes segment replication counts.
Expand Down Expand Up @@ -97,6 +99,16 @@ You can pass the optional query parameter `computeUsingClusterView` to factor in
the segments left to load. See [Coordinator Segment Loading](#segment-loading) for details.
If no used segments are found for the given inputs, this API returns `204 No Content`

For the default and `simple` datasource load status responses, pass the optional query parameter `strictTierAwareSegmentLoad` with no value, or with value `true`, to consider a segment loaded only when at least one Historical replica is loaded in every tier with a positive replica count in the matching retention rule. Without this parameter, or with value `false`, a segment is considered loaded once any Historical replica is loaded.

## Segment handoff

`GET /druid/coordinator/v1/datasources/{dataSourceName}/handoffComplete?interval={myInterval}&partitionNumber={partitionNumber}&version={version}`

Returns whether handoff is complete for a segment descriptor. A segment that is not eligible for load is considered complete. For a segment that is eligible for load, the default response is true once any Historical replica is loaded.

Pass the optional query parameter `strictTierAwareSegmentLoad` with no value, or with value `true`, to return true only when at least one Historical replica is loaded in every tier with a positive replica count in the matching retention rule.

## Metadata store information

:::info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ The `tuningConfig` is optional. If no `tuningConfig` is specified, default param
|`indexSpecForIntermediatePersists`|Object|Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. This can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.| no (default = same as `indexSpec`)|
|`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)|
|`handoffConditionTimeout`|Long| Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.| no (default == 0)|
|`strictTierAwareSegmentLoad`|Boolean|When checking segment handoff, require each segment to have at least one Historical replica loaded in every tier with a positive replica count in the matching retention rule. When false, any loaded Historical replica is enough.|no (default == false)|
|`resetOffsetAutomatically`|Boolean|Controls behavior when Druid needs to read RabbitMQ messages that are no longer available. Not supported. |no (default == false)|
|`skipSequenceNumberAvailabilityCheck`|Boolean|Whether to enable checking if the current sequence number is still available in a particular RabbitMQ stream. If set to false, the indexing task will attempt to reset the current sequence number (or not), depending on the value of `resetOffsetAutomatically`.|no (default == false)|
|`workerThreads`|Integer|The number of threads that the supervisor uses to handle requests/responses for worker tasks, along with any other internal asynchronous operation.|no (default == min(10, taskCount))|
Expand Down Expand Up @@ -235,4 +236,4 @@ In order to configure these, use the dynamic configuration provider of the ioCon
}
}
},
```
```
1 change: 1 addition & 0 deletions docs/ingestion/native-batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ The following table lists the properties of a `tuningConfig` object:
|`chatHandlerTimeout`|Timeout for reporting the pushed segments in worker tasks.|PT10S|no|
|`chatHandlerNumRetries`|Retries for reporting the pushed segments in worker tasks.|5|no|
|`awaitSegmentAvailabilityTimeoutMillis`|Milliseconds to wait for the newly indexed segments to become available for query after ingestion completes. If `<= 0`, no wait occurs. If `> 0`, the task waits for the Coordinator to indicate that the new segments are available for querying. If the timeout expires, the task exits as successful, but the segments are not confirmed as available for query.|Long|no (default = 0)|
|`strictTierAwareSegmentLoad`|When `awaitSegmentAvailabilityTimeoutMillis` is greater than 0, require each newly indexed segment to have at least one Historical replica loaded in every tier with a positive replica count in the matching retention rule before the segment is considered available. When false, any loaded Historical replica is enough.|false|no|

### Split Hint Spec

Expand Down
1 change: 1 addition & 0 deletions docs/ingestion/supervisor.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ For configuration properties specific to Kafka and Kinesis, see [Kafka tuning co
|`indexSpecForIntermediatePersists`|Object|Defines segment storage format options to use at indexing time for intermediate persisted temporary segments. You can use `indexSpecForIntermediatePersists` to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published.|No||
|`reportParseExceptions`|Boolean|DEPRECATED. If `true`, Druid throws exceptions encountered during parsing causing ingestion to halt. If `false`, Druid skips unparseable rows and fields. Setting `reportParseExceptions` to `true` overrides existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to not more than 1.|No|`false`|
|`handoffConditionTimeout`|Long|Number of milliseconds to wait for segment handoff. Set to a value >= 0, where 0 means to wait indefinitely.|No|900000 (15 minutes) for Kafka. 0 for Kinesis.|
|`strictTierAwareSegmentLoad`|Boolean|When checking segment handoff, require each segment to have at least one Historical replica loaded in every tier with a positive replica count in the matching retention rule. When false, any loaded Historical replica is enough.|No|`false`|
|`resetOffsetAutomatically`|Boolean|Resets partitions when the offset is unavailable. If set to `true`, Druid resets partitions to the earliest or latest offset, based on the value of `useEarliestOffset` or `useEarliestSequenceNumber` (earliest if `true`, latest if `false`). If set to `false`, Druid surfaces the exception causing tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation, potentially through [resetting the supervisor](../api-reference/supervisor-api.md#reset-a-supervisor).|No|`false`|
|`workerThreads`|Integer|The number of threads that the supervisor uses to handle requests/responses for worker tasks, along with any other internal asynchronous operation.|No|`min(10, taskCount)`|
|`chatRetries`|Integer|The number of times Druid retries HTTP requests to indexing tasks before considering tasks unresponsive.|No|8|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public RabbitStreamIndexTaskTuningConfig(
@Nullable Integer recordBufferSize,
@Nullable Integer recordBufferOfferTimeout,
@Nullable Integer maxRecordsPerPoll,
@Nullable Integer maxColumnsToMerge
@Nullable Integer maxColumnsToMerge,
@Nullable Boolean strictTierAwareSegmentLoad
)
{
super(
Expand All @@ -101,7 +102,8 @@ public RabbitStreamIndexTaskTuningConfig(
maxSavedParseExceptions,
numPersistThreads,
maxColumnsToMerge,
null
null,
strictTierAwareSegmentLoad
);

this.recordBufferSize = recordBufferSize;
Expand Down Expand Up @@ -135,7 +137,8 @@ private RabbitStreamIndexTaskTuningConfig(
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
@JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge,
@JsonProperty("strictTierAwareSegmentLoad") @Nullable Boolean strictTierAwareSegmentLoad
)
{
this(
Expand All @@ -162,7 +165,8 @@ private RabbitStreamIndexTaskTuningConfig(
recordBufferSize,
recordBufferOfferTimeout,
maxRecordsPerPoll,
maxColumnsToMerge
maxColumnsToMerge,
strictTierAwareSegmentLoad
);
}

Expand Down Expand Up @@ -234,7 +238,8 @@ public RabbitStreamIndexTaskTuningConfig withBasePersistDirectory(File dir)
getRecordBufferSizeConfigured(),
getRecordBufferOfferTimeout(),
getMaxRecordsPerPollConfigured(),
getMaxColumnsToMerge()
getMaxColumnsToMerge(),
isStrictTierAwareSegmentLoad()
);
}

Expand Down Expand Up @@ -262,6 +267,7 @@ public String toString()
", numPersistThreads=" + getNumPersistThreads() +
", maxRecordsPerPole=" + getMaxRecordsPerPollConfigured() +
", maxColumnsToMerge=" + getMaxColumnsToMerge() +
", strictTierAwareSegmentLoad=" + isStrictTierAwareSegmentLoad() +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public static RabbitStreamSupervisorTuningConfig defaultConfig()
null,
null,
null,
null,
null
);
}
Expand Down Expand Up @@ -101,7 +102,8 @@ public RabbitStreamSupervisorTuningConfig(
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge,
@JsonProperty("strictTierAwareSegmentLoad") @Nullable Boolean strictTierAwareSegmentLoad
)
{
super(
Expand All @@ -128,7 +130,8 @@ public RabbitStreamSupervisorTuningConfig(
recordBufferSize,
recordBufferOfferTimeout,
maxRecordsPerPoll,
maxColumnsToMerge
maxColumnsToMerge,
strictTierAwareSegmentLoad
);
this.workerThreads = workerThreads;
this.chatRetries = (chatRetries != null ? chatRetries : DEFAULT_CHAT_RETRIES);
Expand Down Expand Up @@ -215,6 +218,7 @@ public String toString()
", numPersistThreads=" + getNumPersistThreads() +
", maxRecordsPerPoll=" + getMaxRecordsPerPollConfigured() +
", maxColumnsToMerge=" + getMaxColumnsToMerge() +
", strictTierAwareSegmentLoad=" + isStrictTierAwareSegmentLoad() +
'}';
}

Expand All @@ -241,11 +245,12 @@ public RabbitStreamIndexTaskTuningConfig convertToTaskTuningConfig()
isLogParseExceptions(),
getMaxParseExceptions(),
getMaxSavedParseExceptions(),
getNumPersistThreads(),
getRecordBufferSizeConfigured(),
getRecordBufferOfferTimeout(),
getMaxRecordsPerPollConfigured(),
getNumPersistThreads(),
getMaxColumnsToMerge()
);
getMaxColumnsToMerge(),
isStrictTierAwareSegmentLoad()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public void testSerdeWithDefaults() throws Exception

Assert.assertFalse(config.isSkipSequenceNumberAvailabilityCheck());
Assert.assertFalse(config.isResetOffsetAutomatically());
Assert.assertFalse(config.isStrictTierAwareSegmentLoad());
}

@Test
Expand All @@ -89,6 +90,7 @@ public void testSerdeWithNonDefaults() throws Exception
+ " \"maxPendingPersists\": 100,\n"
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100,\n"
+ " \"strictTierAwareSegmentLoad\": true,\n"
+ " \"recordBufferSize\": 1000,\n"
+ " \"recordBufferOfferTimeout\": 500,\n"
+ " \"resetOffsetAutomatically\": false,\n"
Expand Down Expand Up @@ -116,6 +118,7 @@ public void testSerdeWithNonDefaults() throws Exception
Assert.assertEquals(1000, config.getRecordBufferSizeOrDefault(1_000_000_000));
Assert.assertEquals(500, config.getRecordBufferOfferTimeout());
Assert.assertFalse(config.isResetOffsetAutomatically());
Assert.assertTrue(config.isStrictTierAwareSegmentLoad());
}


Expand Down Expand Up @@ -184,7 +187,8 @@ public void testtoString() throws Exception
"maxSavedParseExceptions=0, " +
"numPersistThreads=1, " +
"maxRecordsPerPoll=null, " +
"maxColumnsToMerge=-1}";
"maxColumnsToMerge=-1, " +
"strictTierAwareSegmentLoad=false}";


Assert.assertEquals(resStr, config.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public void setupTest()
null,
null,
100,
null,
null
);
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public void testSerdeWithDefaults() throws Exception
Assert.assertEquals(Duration.standardSeconds(80), config.getShutdownTimeout());
Assert.assertEquals(Duration.standardSeconds(120), config.getRepartitionTransitionDuration());
Assert.assertEquals(100, config.getMaxRecordsPerPollOrDefault());
Assert.assertFalse(config.isStrictTierAwareSegmentLoad());
}

@Test
Expand All @@ -86,6 +87,7 @@ public void testSerdeWithNonDefaults() throws Exception
+ " \"maxPendingPersists\": 100,\n"
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100,\n"
+ " \"strictTierAwareSegmentLoad\": true,\n"
+ " \"workerThreads\": 12,\n"
+ " \"chatRetries\": 14,\n"
+ " \"httpTimeout\": \"PT15S\",\n"
Expand All @@ -112,6 +114,7 @@ public void testSerdeWithNonDefaults() throws Exception
Assert.assertEquals(100, config.getMaxPendingPersists());
Assert.assertEquals(true, config.isReportParseExceptions());
Assert.assertEquals(100, config.getHandoffConditionTimeout());
Assert.assertTrue(config.isStrictTierAwareSegmentLoad());
Assert.assertEquals(12, (int) config.getWorkerThreads());
Assert.assertEquals(14L, (long) config.getChatRetries());
Assert.assertEquals(15, (int) config.getRecordBufferSizeConfigured());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public KafkaIndexTaskTuningConfig(
@Nullable Integer maxSavedParseExceptions,
@Nullable Integer numPersistThreads,
@Nullable Integer maxColumnsToMerge,
@Nullable Boolean releaseLocksOnHandoff
@Nullable Boolean releaseLocksOnHandoff,
@Nullable Boolean strictTierAwareSegmentLoad
)
{
super(
Expand All @@ -80,7 +81,8 @@ public KafkaIndexTaskTuningConfig(
maxSavedParseExceptions,
numPersistThreads,
maxColumnsToMerge,
releaseLocksOnHandoff
releaseLocksOnHandoff,
strictTierAwareSegmentLoad
);
}

Expand All @@ -106,7 +108,8 @@ private KafkaIndexTaskTuningConfig(
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge,
@JsonProperty("releaseLocksOnHandoff") @Nullable Boolean releaseLocksOnHandoff
@JsonProperty("releaseLocksOnHandoff") @Nullable Boolean releaseLocksOnHandoff,
@JsonProperty("strictTierAwareSegmentLoad") @Nullable Boolean strictTierAwareSegmentLoad
)
{
this(
Expand All @@ -131,7 +134,8 @@ private KafkaIndexTaskTuningConfig(
maxSavedParseExceptions,
numPersistThreads,
maxColumnsToMerge,
releaseLocksOnHandoff
releaseLocksOnHandoff,
strictTierAwareSegmentLoad
);
}

Expand Down Expand Up @@ -160,7 +164,8 @@ public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir)
getMaxSavedParseExceptions(),
getNumPersistThreads(),
getMaxColumnsToMerge(),
isReleaseLocksOnHandoff()
isReleaseLocksOnHandoff(),
isStrictTierAwareSegmentLoad()
);
}

Expand Down Expand Up @@ -188,6 +193,7 @@ public String toString()
", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
", numPersistThreads=" + getNumPersistThreads() +
", getMaxColumnsToMerge=" + getMaxColumnsToMerge() +
", strictTierAwareSegmentLoad=" + isStrictTierAwareSegmentLoad() +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public static KafkaSupervisorTuningConfig defaultConfig()
null,
null,
null,
null,
null
);
}
Expand Down Expand Up @@ -97,7 +98,8 @@ public KafkaSupervisorTuningConfig(
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge,
@JsonProperty("releaseLocksOnHandoff") @Nullable Boolean releaseLocksOnHandoff
@JsonProperty("releaseLocksOnHandoff") @Nullable Boolean releaseLocksOnHandoff,
@JsonProperty("strictTierAwareSegmentLoad") @Nullable Boolean strictTierAwareSegmentLoad
)
{
super(
Expand All @@ -122,7 +124,8 @@ public KafkaSupervisorTuningConfig(
maxSavedParseExceptions,
numPersistThreads,
maxColumnsToMerge,
releaseLocksOnHandoff
releaseLocksOnHandoff,
strictTierAwareSegmentLoad
);
this.workerThreads = workerThreads;
this.chatRetries = (chatRetries != null ? chatRetries : DEFAULT_CHAT_RETRIES);
Expand Down Expand Up @@ -209,6 +212,7 @@ public String toString()
", maxParseExceptions=" + getMaxParseExceptions() +
", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
", numPersistThreads=" + getNumPersistThreads() +
", strictTierAwareSegmentLoad=" + isStrictTierAwareSegmentLoad() +
'}';
}

Expand Down Expand Up @@ -237,7 +241,8 @@ public KafkaIndexTaskTuningConfig convertToTaskTuningConfig()
getMaxSavedParseExceptions(),
getNumPersistThreads(),
getMaxColumnsToMerge(),
isReleaseLocksOnHandoff()
isReleaseLocksOnHandoff(),
isStrictTierAwareSegmentLoad()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2901,6 +2901,7 @@ private KafkaIndexTask createTask(
maxSavedParseExceptions,
null,
null,
null,
null
);
if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
Expand Down
Loading
Loading