diff --git a/docs/api-reference/legacy-metadata-api.md b/docs/api-reference/legacy-metadata-api.md index d22be18a7ec9..ea667824dc0a 100644 --- a/docs/api-reference/legacy-metadata-api.md +++ b/docs/api-reference/legacy-metadata-api.md @@ -35,6 +35,8 @@ Returns the percentage of segments actually loaded in the cluster versus segment Returns the number of segments left to load until segments that should be loaded in the cluster are available for queries. This does not include segment replication counts. +Pass the optional query parameter `strictTierAwareSegmentLoad` with no value, or with value `true`, to make the default and `simple` responses consider a segment loaded only when at least one Historical replica is loaded in every tier with a positive replica count in the matching retention rule. Without this parameter, or with value `false`, a segment is considered loaded once any Historical replica is loaded. + `GET /druid/coordinator/v1/loadstatus?full` Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available. This includes segment replication counts. @@ -97,6 +99,16 @@ You can pass the optional query parameter `computeUsingClusterView` to factor in the segments left to load. See [Coordinator Segment Loading](#segment-loading) for details. If no used segments are found for the given inputs, this API returns `204 No Content` +For the default and `simple` datasource load status responses, pass the optional query parameter `strictTierAwareSegmentLoad` with no value, or with value `true`, to consider a segment loaded only when at least one Historical replica is loaded in every tier with a positive replica count in the matching retention rule. Without this parameter, or with value `false`, a segment is considered loaded once any Historical replica is loaded. + +## Segment handoff + +`GET /druid/coordinator/v1/datasources/{dataSourceName}/handoffComplete?interval={myInterval}&partitionNumber={partitionNumber}&version={version}` + +Returns whether handoff is complete for a segment descriptor. A segment that is not eligible for load is considered complete. For a segment that is eligible for load, the default response is true once any Historical replica is loaded. + +Pass the optional query parameter `strictTierAwareSegmentLoad` with no value, or with value `true`, to return true only when at least one Historical replica is loaded in every tier with a positive replica count in the matching retention rule. + ## Metadata store information :::info diff --git a/docs/development/extensions-contrib/rabbit-stream-ingestion.md b/docs/development/extensions-contrib/rabbit-stream-ingestion.md index 9c0e3951807b..39f88b996a04 100644 --- a/docs/development/extensions-contrib/rabbit-stream-ingestion.md +++ b/docs/development/extensions-contrib/rabbit-stream-ingestion.md @@ -157,6 +157,7 @@ The `tuningConfig` is optional. If no `tuningConfig` is specified, default param |`indexSpecForIntermediatePersists`|Object|Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. This can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.| no (default = same as `indexSpec`)| |`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)| |`handoffConditionTimeout`|Long| Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.| no (default == 0)| +|`strictTierAwareSegmentLoad`|Boolean|When checking segment handoff, require each segment to have at least one Historical replica loaded in every tier with a positive replica count in the matching retention rule. When false, any loaded Historical replica is enough.|no (default == false)| |`resetOffsetAutomatically`|Boolean|Controls behavior when Druid needs to read RabbitMQ messages that are no longer available. Not supported. |no (default == false)| |`skipSequenceNumberAvailabilityCheck`|Boolean|Whether to enable checking if the current sequence number is still available in a particular RabbitMQ stream. If set to false, the indexing task will attempt to reset the current sequence number (or not), depending on the value of `resetOffsetAutomatically`.|no (default == false)| |`workerThreads`|Integer|The number of threads that the supervisor uses to handle requests/responses for worker tasks, along with any other internal asynchronous operation.|no (default == min(10, taskCount))| @@ -235,4 +236,4 @@ In order to configure these, use the dynamic configuration provider of the ioCon } } }, - ``` \ No newline at end of file + ``` diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 50eaf43366dc..bf1dc790ce11 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -276,6 +276,7 @@ The following table lists the properties of a `tuningConfig` object: |`chatHandlerTimeout`|Timeout for reporting the pushed segments in worker tasks.|PT10S|no| |`chatHandlerNumRetries`|Retries for reporting the pushed segments in worker tasks.|5|no| |`awaitSegmentAvailabilityTimeoutMillis`|Milliseconds to wait for the newly indexed segments to become available for query after ingestion completes. If `<= 0`, no wait occurs. If `> 0`, the task waits for the Coordinator to indicate that the new segments are available for querying. If the timeout expires, the task exits as successful, but the segments are not confirmed as available for query.|Long|no (default = 0)| +|`strictTierAwareSegmentLoad`|When `awaitSegmentAvailabilityTimeoutMillis` is greater than 0, require each newly indexed segment to have at least one Historical replica loaded in every tier with a positive replica count in the matching retention rule before the segment is considered available. When false, any loaded Historical replica is enough.|false|no| ### Split Hint Spec diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md index 4579532e09d7..f9d04632ee10 100644 --- a/docs/ingestion/supervisor.md +++ b/docs/ingestion/supervisor.md @@ -325,6 +325,7 @@ For configuration properties specific to Kafka and Kinesis, see [Kafka tuning co |`indexSpecForIntermediatePersists`|Object|Defines segment storage format options to use at indexing time for intermediate persisted temporary segments. You can use `indexSpecForIntermediatePersists` to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published.|No|| |`reportParseExceptions`|Boolean|DEPRECATED. If `true`, Druid throws exceptions encountered during parsing causing ingestion to halt. If `false`, Druid skips unparseable rows and fields. Setting `reportParseExceptions` to `true` overrides existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to not more than 1.|No|`false`| |`handoffConditionTimeout`|Long|Number of milliseconds to wait for segment handoff. Set to a value >= 0, where 0 means to wait indefinitely.|No|900000 (15 minutes) for Kafka. 0 for Kinesis.| +|`strictTierAwareSegmentLoad`|Boolean|When checking segment handoff, require each segment to have at least one Historical replica loaded in every tier with a positive replica count in the matching retention rule. When false, any loaded Historical replica is enough.|No|`false`| |`resetOffsetAutomatically`|Boolean|Resets partitions when the offset is unavailable. If set to `true`, Druid resets partitions to the earliest or latest offset, based on the value of `useEarliestOffset` or `useEarliestSequenceNumber` (earliest if `true`, latest if `false`). If set to `false`, Druid surfaces the exception causing tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation, potentially through [resetting the supervisor](../api-reference/supervisor-api.md#reset-a-supervisor).|No|`false`| |`workerThreads`|Integer|The number of threads that the supervisor uses to handle requests/responses for worker tasks, along with any other internal asynchronous operation.|No|`min(10, taskCount)`| |`chatRetries`|Integer|The number of times Druid retries HTTP requests to indexing tasks before considering tasks unresponsive.|No|8| diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfig.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfig.java index dd5bc84d5239..a4db31314255 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfig.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfig.java @@ -75,7 +75,8 @@ public RabbitStreamIndexTaskTuningConfig( @Nullable Integer recordBufferSize, @Nullable Integer recordBufferOfferTimeout, @Nullable Integer maxRecordsPerPoll, - @Nullable Integer maxColumnsToMerge + @Nullable Integer maxColumnsToMerge, + @Nullable Boolean strictTierAwareSegmentLoad ) { super( @@ -101,7 +102,8 @@ public RabbitStreamIndexTaskTuningConfig( maxSavedParseExceptions, numPersistThreads, maxColumnsToMerge, - null + null, + strictTierAwareSegmentLoad ); this.recordBufferSize = recordBufferSize; @@ -135,7 +137,8 @@ private RabbitStreamIndexTaskTuningConfig( @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads, @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll, - @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge + @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge, + @JsonProperty("strictTierAwareSegmentLoad") @Nullable Boolean strictTierAwareSegmentLoad ) { this( @@ -162,7 +165,8 @@ private RabbitStreamIndexTaskTuningConfig( recordBufferSize, recordBufferOfferTimeout, maxRecordsPerPoll, - maxColumnsToMerge + maxColumnsToMerge, + strictTierAwareSegmentLoad ); } @@ -234,7 +238,8 @@ public RabbitStreamIndexTaskTuningConfig withBasePersistDirectory(File dir) getRecordBufferSizeConfigured(), getRecordBufferOfferTimeout(), getMaxRecordsPerPollConfigured(), - getMaxColumnsToMerge() + getMaxColumnsToMerge(), + isStrictTierAwareSegmentLoad() ); } @@ -262,6 +267,7 @@ public String toString() ", numPersistThreads=" + getNumPersistThreads() + ", maxRecordsPerPole=" + getMaxRecordsPerPollConfigured() + ", maxColumnsToMerge=" + getMaxColumnsToMerge() + + ", strictTierAwareSegmentLoad=" + isStrictTierAwareSegmentLoad() + '}'; } diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTuningConfig.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTuningConfig.java index a2667026fffd..6165f2a92dd4 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTuningConfig.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTuningConfig.java @@ -69,6 +69,7 @@ public static RabbitStreamSupervisorTuningConfig defaultConfig() null, null, null, + null, null ); } @@ -101,7 +102,8 @@ public RabbitStreamSupervisorTuningConfig( @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll, - @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge + @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge, + @JsonProperty("strictTierAwareSegmentLoad") @Nullable Boolean strictTierAwareSegmentLoad ) { super( @@ -128,7 +130,8 @@ public RabbitStreamSupervisorTuningConfig( recordBufferSize, recordBufferOfferTimeout, maxRecordsPerPoll, - maxColumnsToMerge + maxColumnsToMerge, + strictTierAwareSegmentLoad ); this.workerThreads = workerThreads; this.chatRetries = (chatRetries != null ? chatRetries : DEFAULT_CHAT_RETRIES); @@ -215,6 +218,7 @@ public String toString() ", numPersistThreads=" + getNumPersistThreads() + ", maxRecordsPerPoll=" + getMaxRecordsPerPollConfigured() + ", maxColumnsToMerge=" + getMaxColumnsToMerge() + + ", strictTierAwareSegmentLoad=" + isStrictTierAwareSegmentLoad() + '}'; } @@ -241,11 +245,12 @@ public RabbitStreamIndexTaskTuningConfig convertToTaskTuningConfig() isLogParseExceptions(), getMaxParseExceptions(), getMaxSavedParseExceptions(), + getNumPersistThreads(), getRecordBufferSizeConfigured(), getRecordBufferOfferTimeout(), getMaxRecordsPerPollConfigured(), - getNumPersistThreads(), - getMaxColumnsToMerge() - ); + getMaxColumnsToMerge(), + isStrictTierAwareSegmentLoad() + ); } } diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfigTest.java b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfigTest.java index 19930d1f9d54..dfad062ba97d 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfigTest.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfigTest.java @@ -75,6 +75,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertFalse(config.isSkipSequenceNumberAvailabilityCheck()); Assert.assertFalse(config.isResetOffsetAutomatically()); + Assert.assertFalse(config.isStrictTierAwareSegmentLoad()); } @Test @@ -89,6 +90,7 @@ public void testSerdeWithNonDefaults() throws Exception + " \"maxPendingPersists\": 100,\n" + " \"reportParseExceptions\": true,\n" + " \"handoffConditionTimeout\": 100,\n" + + " \"strictTierAwareSegmentLoad\": true,\n" + " \"recordBufferSize\": 1000,\n" + " \"recordBufferOfferTimeout\": 500,\n" + " \"resetOffsetAutomatically\": false,\n" @@ -116,6 +118,7 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals(1000, config.getRecordBufferSizeOrDefault(1_000_000_000)); Assert.assertEquals(500, config.getRecordBufferOfferTimeout()); Assert.assertFalse(config.isResetOffsetAutomatically()); + Assert.assertTrue(config.isStrictTierAwareSegmentLoad()); } @@ -184,7 +187,8 @@ public void testtoString() throws Exception "maxSavedParseExceptions=0, " + "numPersistThreads=1, " + "maxRecordsPerPoll=null, " + - "maxColumnsToMerge=-1}"; + "maxColumnsToMerge=-1, " + + "strictTierAwareSegmentLoad=false}"; Assert.assertEquals(resStr, config.toString()); diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java index 82ee7afd0e01..128e9e978bde 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java @@ -168,6 +168,7 @@ public void setupTest() null, null, 100, + null, null ); rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTuningConfigTest.java b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTuningConfigTest.java index 697306b175f3..118c1f767f37 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTuningConfigTest.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTuningConfigTest.java @@ -72,6 +72,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(Duration.standardSeconds(80), config.getShutdownTimeout()); Assert.assertEquals(Duration.standardSeconds(120), config.getRepartitionTransitionDuration()); Assert.assertEquals(100, config.getMaxRecordsPerPollOrDefault()); + Assert.assertFalse(config.isStrictTierAwareSegmentLoad()); } @Test @@ -86,6 +87,7 @@ public void testSerdeWithNonDefaults() throws Exception + " \"maxPendingPersists\": 100,\n" + " \"reportParseExceptions\": true,\n" + " \"handoffConditionTimeout\": 100,\n" + + " \"strictTierAwareSegmentLoad\": true,\n" + " \"workerThreads\": 12,\n" + " \"chatRetries\": 14,\n" + " \"httpTimeout\": \"PT15S\",\n" @@ -112,6 +114,7 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals(100, config.getMaxPendingPersists()); Assert.assertEquals(true, config.isReportParseExceptions()); Assert.assertEquals(100, config.getHandoffConditionTimeout()); + Assert.assertTrue(config.isStrictTierAwareSegmentLoad()); Assert.assertEquals(12, (int) config.getWorkerThreads()); Assert.assertEquals(14L, (long) config.getChatRetries()); Assert.assertEquals(15, (int) config.getRecordBufferSizeConfigured()); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java index 3fa046b63e6d..a909fbd29cc1 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java @@ -54,7 +54,8 @@ public KafkaIndexTaskTuningConfig( @Nullable Integer maxSavedParseExceptions, @Nullable Integer numPersistThreads, @Nullable Integer maxColumnsToMerge, - @Nullable Boolean releaseLocksOnHandoff + @Nullable Boolean releaseLocksOnHandoff, + @Nullable Boolean strictTierAwareSegmentLoad ) { super( @@ -80,7 +81,8 @@ public KafkaIndexTaskTuningConfig( maxSavedParseExceptions, numPersistThreads, maxColumnsToMerge, - releaseLocksOnHandoff + releaseLocksOnHandoff, + strictTierAwareSegmentLoad ); } @@ -106,7 +108,8 @@ private KafkaIndexTaskTuningConfig( @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads, @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge, - @JsonProperty("releaseLocksOnHandoff") @Nullable Boolean releaseLocksOnHandoff + @JsonProperty("releaseLocksOnHandoff") @Nullable Boolean releaseLocksOnHandoff, + @JsonProperty("strictTierAwareSegmentLoad") @Nullable Boolean strictTierAwareSegmentLoad ) { this( @@ -131,7 +134,8 @@ private KafkaIndexTaskTuningConfig( maxSavedParseExceptions, numPersistThreads, maxColumnsToMerge, - releaseLocksOnHandoff + releaseLocksOnHandoff, + strictTierAwareSegmentLoad ); } @@ -160,7 +164,8 @@ public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir) getMaxSavedParseExceptions(), getNumPersistThreads(), getMaxColumnsToMerge(), - isReleaseLocksOnHandoff() + isReleaseLocksOnHandoff(), + isStrictTierAwareSegmentLoad() ); } @@ -188,6 +193,7 @@ public String toString() ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + ", numPersistThreads=" + getNumPersistThreads() + ", getMaxColumnsToMerge=" + getMaxColumnsToMerge() + + ", strictTierAwareSegmentLoad=" + isStrictTierAwareSegmentLoad() + '}'; } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index c4a21674d301..2bdf82b107e3 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -67,6 +67,7 @@ public static KafkaSupervisorTuningConfig defaultConfig() null, null, null, + null, null ); } @@ -97,7 +98,8 @@ public KafkaSupervisorTuningConfig( @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads, @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge, - @JsonProperty("releaseLocksOnHandoff") @Nullable Boolean releaseLocksOnHandoff + @JsonProperty("releaseLocksOnHandoff") @Nullable Boolean releaseLocksOnHandoff, + @JsonProperty("strictTierAwareSegmentLoad") @Nullable Boolean strictTierAwareSegmentLoad ) { super( @@ -122,7 +124,8 @@ public KafkaSupervisorTuningConfig( maxSavedParseExceptions, numPersistThreads, maxColumnsToMerge, - releaseLocksOnHandoff + releaseLocksOnHandoff, + strictTierAwareSegmentLoad ); this.workerThreads = workerThreads; this.chatRetries = (chatRetries != null ? chatRetries : DEFAULT_CHAT_RETRIES); @@ -209,6 +212,7 @@ public String toString() ", maxParseExceptions=" + getMaxParseExceptions() + ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + ", numPersistThreads=" + getNumPersistThreads() + + ", strictTierAwareSegmentLoad=" + isStrictTierAwareSegmentLoad() + '}'; } @@ -237,7 +241,8 @@ public KafkaIndexTaskTuningConfig convertToTaskTuningConfig() getMaxSavedParseExceptions(), getNumPersistThreads(), getMaxColumnsToMerge(), - isReleaseLocksOnHandoff() + isReleaseLocksOnHandoff(), + isStrictTierAwareSegmentLoad() ); } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index bb40602be7ba..49822efee14d 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2901,6 +2901,7 @@ private KafkaIndexTask createTask( maxSavedParseExceptions, null, null, + null, null ); if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) { diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java index 0083e76ea75c..8789b8bbafa6 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java @@ -75,6 +75,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(Duration.ofMinutes(15).toMillis(), config.getHandoffConditionTimeout()); Assert.assertEquals(1, config.getNumPersistThreads()); Assert.assertEquals(-1, config.getMaxColumnsToMerge()); + Assert.assertFalse(config.isStrictTierAwareSegmentLoad()); } @Test @@ -90,6 +91,7 @@ public void testSerdeWithNonDefaults() throws Exception + " \"maxPendingPersists\": 100,\n" + " \"reportParseExceptions\": true,\n" + " \"handoffConditionTimeout\": 100,\n" + + " \"strictTierAwareSegmentLoad\": true,\n" + " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n" + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" },\n" + " \"appendableIndexSpec\": { \"type\" : \"onheap\" },\n" @@ -116,6 +118,7 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals(100, config.getMaxPendingPersists()); Assert.assertEquals(true, config.isReportParseExceptions()); Assert.assertEquals(100, config.getHandoffConditionTimeout()); + Assert.assertTrue(config.isStrictTierAwareSegmentLoad()); Assert.assertEquals( IndexSpec.builder().withMetricCompression(CompressionStrategy.NONE).build(), config.getIndexSpec() @@ -143,6 +146,7 @@ public void testConvert() .withIndexSpecForIntermediatePersists(IndexSpec.getDefault()) .withReportParseExceptions(true) .withMaxColumnsToMerge(5) + .withStrictTierAwareSegmentLoad(true) .build(); KafkaIndexTaskTuningConfig copy = original.convertToTaskTuningConfig(); @@ -159,6 +163,7 @@ public void testConvert() Assert.assertEquals(5L, copy.getHandoffConditionTimeout()); Assert.assertEquals(2, copy.getNumPersistThreads()); Assert.assertEquals(5, copy.getMaxColumnsToMerge()); + Assert.assertTrue(copy.isStrictTierAwareSegmentLoad()); } @Test @@ -186,7 +191,8 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException 42, 2, -1, - false + false, + true ); String serialized = mapper.writeValueAsString(base); @@ -213,6 +219,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions()); Assert.assertEquals(base.getNumPersistThreads(), deserialized.getNumPersistThreads()); Assert.assertEquals(base.getMaxColumnsToMerge(), deserialized.getMaxColumnsToMerge()); + Assert.assertEquals(base.isStrictTierAwareSegmentLoad(), deserialized.isStrictTierAwareSegmentLoad()); } @Test @@ -239,6 +246,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException 42, 2, -1, + true, "extra string" ); @@ -265,6 +273,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions()); Assert.assertEquals(base.getNumPersistThreads(), deserialized.getNumPersistThreads()); Assert.assertEquals(base.getMaxColumnsToMerge(), deserialized.getMaxColumnsToMerge()); + Assert.assertEquals(base.isStrictTierAwareSegmentLoad(), deserialized.isStrictTierAwareSegmentLoad()); } @Test diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java index 7c13dcbfb4a3..c4e3431f5369 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java @@ -71,6 +71,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(Duration.standardSeconds(10), config.getHttpTimeout()); Assert.assertEquals(Duration.standardSeconds(80), config.getShutdownTimeout()); Assert.assertEquals(Duration.standardSeconds(30), config.getOffsetFetchPeriod()); + Assert.assertFalse(config.isStrictTierAwareSegmentLoad()); } @Test @@ -85,6 +86,7 @@ public void testSerdeWithNonDefaults() throws Exception + " \"maxPendingPersists\": 100,\n" + " \"reportParseExceptions\": true,\n" + " \"handoffConditionTimeout\": 100,\n" + + " \"strictTierAwareSegmentLoad\": true,\n" + " \"workerThreads\": 12,\n" + " \"chatRetries\": 14,\n" + " \"httpTimeout\": \"PT15S\",\n" @@ -113,6 +115,7 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals(100, config.getMaxPendingPersists()); Assert.assertTrue(config.isReportParseExceptions()); Assert.assertEquals(100, config.getHandoffConditionTimeout()); + Assert.assertTrue(config.isStrictTierAwareSegmentLoad()); Assert.assertEquals(12, (int) config.getWorkerThreads()); Assert.assertEquals(14L, (long) config.getChatRetries()); Assert.assertEquals(Duration.standardSeconds(15), config.getHttpTimeout()); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaTuningConfigBuilder.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaTuningConfigBuilder.java index 44113519c9c9..200f637dab87 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaTuningConfigBuilder.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaTuningConfigBuilder.java @@ -113,7 +113,8 @@ public KafkaSupervisorTuningConfig build() maxSavedParseExceptions, numPersistThreads, maxColumnsToMerge, - releaseLocksOnHandoff + releaseLocksOnHandoff, + strictTierAwareSegmentLoad ); } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java index 08a083fe1124..796da9861e1d 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java @@ -57,6 +57,7 @@ public TestModifiedKafkaIndexTaskTuningConfig( @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads, @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge, + @JsonProperty("strictTierAwareSegmentLoad") @Nullable Boolean strictTierAwareSegmentLoad, @JsonProperty("extra") String extra ) { @@ -82,7 +83,8 @@ public TestModifiedKafkaIndexTaskTuningConfig( maxSavedParseExceptions, numPersistThreads, maxColumnsToMerge, - null + null, + strictTierAwareSegmentLoad ); this.extra = extra; } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java index 5720ce2ae22a..e1d9911ecc98 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java @@ -87,7 +87,8 @@ public KinesisIndexTaskTuningConfig( @Deprecated @Nullable Integer maxRecordsPerPoll, @Nullable Integer maxBytesPerPoll, @Nullable Period intermediateHandoffPeriod, - @Nullable Integer maxColumnsToMerge + @Nullable Integer maxColumnsToMerge, + @Nullable Boolean strictTierAwareSegmentLoad ) { super( @@ -113,7 +114,8 @@ public KinesisIndexTaskTuningConfig( maxSavedParseExceptions, null, maxColumnsToMerge, - false + false, + strictTierAwareSegmentLoad ); this.recordBufferSize = recordBufferSize; this.recordBufferSizeBytes = recordBufferSizeBytes; @@ -159,7 +161,8 @@ private KinesisIndexTaskTuningConfig( @JsonProperty("maxRecordsPerPoll") @Deprecated @Nullable Integer maxRecordsPerPoll, @JsonProperty("maxBytesPerPoll") @Nullable Integer maxBytesPerPoll, @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, - @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge + @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge, + @JsonProperty("strictTierAwareSegmentLoad") @Nullable Boolean strictTierAwareSegmentLoad ) { this( @@ -190,7 +193,8 @@ private KinesisIndexTaskTuningConfig( maxRecordsPerPoll, maxBytesPerPoll, intermediateHandoffPeriod, - maxColumnsToMerge + maxColumnsToMerge, + strictTierAwareSegmentLoad ); } @@ -294,7 +298,8 @@ public KinesisIndexTaskTuningConfig withBasePersistDirectory(File dir) getMaxRecordsPerPollConfigured(), getMaxBytesPerPollConfigured(), getIntermediateHandoffPeriod(), - getMaxColumnsToMerge() + getMaxColumnsToMerge(), + isStrictTierAwareSegmentLoad() ); } @@ -364,6 +369,7 @@ public String toString() ", maxBytesPerPoll=" + maxBytesPerPoll + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + ", maxColumnsToMerge=" + getMaxColumnsToMerge() + + ", strictTierAwareSegmentLoad=" + isStrictTierAwareSegmentLoad() + '}'; } } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java index 1a11f8d658b7..46ab415ca451 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java @@ -77,6 +77,7 @@ public static KinesisSupervisorTuningConfig defaultConfig() null, null, null, + null, null ); } @@ -115,7 +116,8 @@ public KinesisSupervisorTuningConfig( @JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration, @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod, @JsonProperty("useListShards") Boolean useListShards, - @JsonProperty("maxColumnsToMerge") Integer maxColumnsToMerge + @JsonProperty("maxColumnsToMerge") Integer maxColumnsToMerge, + @JsonProperty("strictTierAwareSegmentLoad") @Nullable Boolean strictTierAwareSegmentLoad ) { super( @@ -146,7 +148,8 @@ public KinesisSupervisorTuningConfig( maxRecordsPerPoll, maxBytesPerPoll, intermediateHandoffPeriod, - maxColumnsToMerge + maxColumnsToMerge, + strictTierAwareSegmentLoad ); this.workerThreads = workerThreads; @@ -248,6 +251,7 @@ public String toString() ", repartitionTransitionDuration=" + getRepartitionTransitionDuration() + ", useListShards=" + isUseListShards() + ", maxColumnsToMerge=" + getMaxColumnsToMerge() + + ", strictTierAwareSegmentLoad=" + isStrictTierAwareSegmentLoad() + '}'; } @@ -282,7 +286,8 @@ public KinesisIndexTaskTuningConfig convertToTaskTuningConfig() getMaxRecordsPerPollConfigured(), getMaxBytesPerPollConfigured(), getIntermediateHandoffPeriod(), - getMaxColumnsToMerge() + getMaxColumnsToMerge(), + isStrictTierAwareSegmentLoad() ); } } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java index 3089a39537cd..5a8552c36867 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java @@ -82,6 +82,7 @@ public class KinesisIndexTaskSerdeTest null, null, null, + null, null ); private static final KinesisIndexTaskIOConfig IO_CONFIG = new KinesisIndexTaskIOConfig( diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index f4ccfd4fc702..ead54e4779ef 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -2346,6 +2346,7 @@ private KinesisIndexTask createTask( maxRecordsPerPoll, maxBytesPerPoll, intermediateHandoffPeriod, + null, null ); return createTask(taskId, dataSchema, ioConfig, tuningConfig, context); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java index 5afbb4e78233..05d0def9e367 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java @@ -86,6 +86,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertNull(config.getFetchThreads()); Assert.assertFalse(config.isSkipSequenceNumberAvailabilityCheck()); Assert.assertFalse(config.isResetOffsetAutomatically()); + Assert.assertFalse(config.isStrictTierAwareSegmentLoad()); } @Test @@ -105,6 +106,7 @@ public void testSerdeWithNonDefaults() throws Exception + " \"recordBufferFullWait\": 500,\n" + " \"resetOffsetAutomatically\": false,\n" + " \"skipSequenceNumberAvailabilityCheck\": true,\n" + + " \"strictTierAwareSegmentLoad\": true,\n" + " \"fetchThreads\": 2,\n" + " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n" + "}"; @@ -135,6 +137,7 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertTrue(config.isSkipSequenceNumberAvailabilityCheck()); Assert.assertFalse(config.isResetOffsetAutomatically()); Assert.assertEquals(-1, config.getMaxColumnsToMerge()); + Assert.assertTrue(config.isStrictTierAwareSegmentLoad()); } @@ -169,7 +172,8 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException 6000, 1_000_000, new Period("P3D"), - 1000 + 1000, + true ); String serialized = mapper.writeValueAsString(base); @@ -201,6 +205,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException Assert.assertEquals(base.getMaxRecordsPerPollConfigured(), deserialized.getMaxRecordsPerPollConfigured()); Assert.assertEquals(base.getMaxBytesPerPollConfigured(), deserialized.getMaxBytesPerPollConfigured()); Assert.assertEquals(base.getMaxColumnsToMerge(), deserialized.getMaxColumnsToMerge()); + Assert.assertEquals(base.isStrictTierAwareSegmentLoad(), deserialized.isStrictTierAwareSegmentLoad()); } @Test @@ -234,7 +239,8 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException 1_000_000, 6000, new Period("P3D"), - 1000 + 1000, + true ); String serialized = mapper.writeValueAsString(new TestModifiedKinesisIndexTaskTuningConfig(base, "loool")); @@ -263,6 +269,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException Assert.assertEquals(base.getRecordBufferSizeBytesConfigured(), deserialized.getRecordBufferSizeBytesConfigured()); Assert.assertEquals(base.getMaxRecordsPerPollConfigured(), deserialized.getMaxRecordsPerPollConfigured()); Assert.assertEquals(base.getMaxColumnsToMerge(), deserialized.getMaxColumnsToMerge()); + Assert.assertEquals(base.isStrictTierAwareSegmentLoad(), deserialized.isStrictTierAwareSegmentLoad()); } @Test @@ -329,7 +336,8 @@ public void testConvert() null, null, null, - null + null, + true ); KinesisIndexTaskTuningConfig copy = original.convertToTaskTuningConfig(); @@ -353,6 +361,7 @@ public void testConvert() Assert.assertEquals(10, (int) copy.getMaxRecordsPerPollConfigured()); Assert.assertEquals(new Period().withDays(Integer.MAX_VALUE), copy.getIntermediateHandoffPeriod()); Assert.assertEquals(-1, copy.getMaxColumnsToMerge()); + Assert.assertTrue(copy.isStrictTierAwareSegmentLoad()); } @Test diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 3f6da8aa0532..5c674700e18c 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -213,6 +213,7 @@ public void setupTest() null, null, null, + null, null ); rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); @@ -4020,6 +4021,7 @@ public void testIsTaskCurrent() null, null, null, + null, null ); @@ -5435,6 +5437,7 @@ public SeekableStreamIndexTaskClient build( null, null, null, + null, null ); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java index 350f32003b2c..db354a17a413 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java @@ -69,6 +69,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(Duration.standardSeconds(10), config.getHttpTimeout()); Assert.assertEquals(Duration.standardSeconds(80), config.getShutdownTimeout()); Assert.assertEquals(Duration.standardSeconds(120), config.getRepartitionTransitionDuration()); + Assert.assertFalse(config.isStrictTierAwareSegmentLoad()); } @Test @@ -83,6 +84,7 @@ public void testSerdeWithNonDefaults() throws Exception + " \"maxPendingPersists\": 100,\n" + " \"reportParseExceptions\": true,\n" + " \"handoffConditionTimeout\": 100,\n" + + " \"strictTierAwareSegmentLoad\": true,\n" + " \"workerThreads\": 12,\n" + " \"chatRetries\": 14,\n" + " \"httpTimeout\": \"PT15S\",\n" @@ -109,6 +111,7 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals(100, config.getMaxPendingPersists()); Assert.assertEquals(true, config.isReportParseExceptions()); Assert.assertEquals(100, config.getHandoffConditionTimeout()); + Assert.assertTrue(config.isStrictTierAwareSegmentLoad()); Assert.assertEquals(12, (int) config.getWorkerThreads()); Assert.assertEquals(14L, (long) config.getChatRetries()); Assert.assertEquals(Duration.standardSeconds(15), config.getHttpTimeout()); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java index b85c3edc2a65..bec92b00f891 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java @@ -64,6 +64,7 @@ public TestModifiedKinesisIndexTaskTuningConfig( @JsonProperty("maxBytesPerPoll") @Nullable Integer maxBytesPerPoll, @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge, + @JsonProperty("strictTierAwareSegmentLoad") @Nullable Boolean strictTierAwareSegmentLoad, @JsonProperty("extra") String extra ) { @@ -95,7 +96,8 @@ public TestModifiedKinesisIndexTaskTuningConfig( maxRecordsPerPoll, maxBytesPerPoll, intermediateHandoffPeriod, - maxColumnsToMerge + maxColumnsToMerge, + strictTierAwareSegmentLoad ); this.extra = extra; } @@ -130,7 +132,8 @@ public TestModifiedKinesisIndexTaskTuningConfig(KinesisIndexTaskTuningConfig bas base.getMaxRecordsPerPollConfigured(), base.getMaxBytesPerPollConfigured(), base.getIntermediateHandoffPeriod(), - base.getMaxColumnsToMerge() + base.getMaxColumnsToMerge(), + base.isStrictTierAwareSegmentLoad() ); this.extra = extra; } diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java index ffed3e2d6a5f..d9f865b49395 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java @@ -109,6 +109,7 @@ public static Task getTask() null, null, 1L, + null, null ) ), diff --git a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyCoordinatorClient.java b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyCoordinatorClient.java index c03478f33fae..dd2843a4018d 100644 --- a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyCoordinatorClient.java +++ b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyCoordinatorClient.java @@ -74,7 +74,17 @@ public FaultyCoordinatorClient( } @Override - public ListenableFuture isHandoffComplete(String dataSource, SegmentDescriptor descriptor) + public ListenableFuture isHandoffComplete(final String dataSource, final SegmentDescriptor descriptor) + { + return isHandoffComplete(dataSource, descriptor, false); + } + + @Override + public ListenableFuture isHandoffComplete( + final String dataSource, + final SegmentDescriptor descriptor, + final boolean strictTierAwareSegmentLoad + ) { final Duration minHandoffDelay = getHandoffDelay(); if (minHandoffDelay != null) { @@ -98,7 +108,7 @@ public ListenableFuture isHandoffComplete(String dataSource, SegmentDes } // Call Coordinator for the actual handoff status - return super.isHandoffComplete(dataSource, descriptor); + return super.isHandoffComplete(dataSource, descriptor, strictTierAwareSegmentLoad); } private Duration getHandoffDelay() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 68ece19f87af..dcdd0df9bd56 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -70,6 +70,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.handoff.SegmentHandoffNotifier; +import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory; import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.indexing.DataSchema; @@ -719,6 +720,16 @@ protected boolean waitForSegmentAvailability( List segmentsToWaitFor, long waitTimeout ) + { + return waitForSegmentAvailability(toolbox, segmentsToWaitFor, waitTimeout, false); + } + + protected boolean waitForSegmentAvailability( + TaskToolbox toolbox, + List segmentsToWaitFor, + long waitTimeout, + boolean strictTierAwareSegmentLoad + ) { if (segmentsToWaitFor.isEmpty()) { log.info("No segments to wait for availability."); @@ -729,13 +740,13 @@ protected boolean waitForSegmentAvailability( } log.info("Waiting for [%d] segments to be loaded by the cluster...", segmentsToWaitFor.size()); final Stopwatch stopwatch = Stopwatch.createStarted(); + final SegmentHandoffNotifierFactory notifierFactory = toolbox.getSegmentHandoffNotifierFactory(); + final String dataSource = segmentsToWaitFor.get(0).getDataSource(); try ( - SegmentHandoffNotifier notifier = toolbox.getSegmentHandoffNotifierFactory() - .createSegmentHandoffNotifier( - segmentsToWaitFor.get(0).getDataSource(), - getId() - ) + SegmentHandoffNotifier notifier = strictTierAwareSegmentLoad + ? notifierFactory.createSegmentHandoffNotifier(dataSource, getId(), true) + : notifierFactory.createSegmentHandoffNotifier(dataSource, getId()) ) { final ExecutorService exec = Execs.directExecutor(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 82310272b369..079a296f7e1c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -319,6 +319,7 @@ static CompactionTuningConfig getTuningConfig(TuningConfig tuningConfig) parallelIndexTuningConfig.getMaxSavedParseExceptions(), parallelIndexTuningConfig.getMaxColumnsToMerge(), parallelIndexTuningConfig.getAwaitSegmentAvailabilityTimeoutMillis(), + parallelIndexTuningConfig.isStrictTierAwareSegmentLoad(), parallelIndexTuningConfig.getNumPersistThreads() ); } else if (tuningConfig instanceof IndexTuningConfig) { @@ -354,6 +355,7 @@ static CompactionTuningConfig getTuningConfig(TuningConfig tuningConfig) indexTuningConfig.getMaxSavedParseExceptions(), indexTuningConfig.getMaxColumnsToMerge(), indexTuningConfig.getAwaitSegmentAvailabilityTimeoutMillis(), + indexTuningConfig.isStrictTierAwareSegmentLoad(), indexTuningConfig.getNumPersistThreads() ); } else { @@ -1559,6 +1561,7 @@ public static CompactionTuningConfig defaultConfig() null, null, 0L, + null, null ); } @@ -1596,6 +1599,7 @@ public CompactionTuningConfig( @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge, @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis, + @JsonProperty("strictTierAwareSegmentLoad") @Nullable Boolean strictTierAwareSegmentLoad, @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads ) { @@ -1630,6 +1634,7 @@ public CompactionTuningConfig( maxSavedParseExceptions, maxColumnsToMerge, awaitSegmentAvailabilityTimeoutMillis, + strictTierAwareSegmentLoad, null, numPersistThreads ); @@ -1676,6 +1681,7 @@ public CompactionTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec) getMaxSavedParseExceptions(), getMaxColumnsToMerge(), getAwaitSegmentAvailabilityTimeoutMillis(), + isStrictTierAwareSegmentLoad(), getNumPersistThreads() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index eb3b5c0e84f8..3fe58678b99f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -958,7 +958,8 @@ private TaskStatus generateAndPublishSegments( waitForSegmentAvailability( toolbox, segmentsToWaitFor, - tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis() + tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis(), + tuningConfig.isStrictTierAwareSegmentLoad() ); } @@ -1189,6 +1190,7 @@ public static class IndexTuningConfig implements AppenderatorConfig private static final boolean DEFAULT_GUARANTEE_ROLLUP = false; private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false; private static final long DEFAULT_PUSH_TIMEOUT = 0; + private static final boolean DEFAULT_STRICT_TIER_AWARE_SEGMENT_LOAD = false; private final AppendableIndexSpec appendableIndexSpec; private final int maxRowsInMemory; @@ -1217,6 +1219,7 @@ public static class IndexTuningConfig implements AppenderatorConfig private final int maxParseExceptions; private final int maxSavedParseExceptions; private final long awaitSegmentAvailabilityTimeoutMillis; + private final boolean strictTierAwareSegmentLoad; @Nullable private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; @@ -1290,6 +1293,7 @@ public IndexTuningConfig( @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge, @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis, + @JsonProperty("strictTierAwareSegmentLoad") @Nullable Boolean strictTierAwareSegmentLoad, @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads ) { @@ -1319,6 +1323,7 @@ public IndexTuningConfig( maxSavedParseExceptions, maxColumnsToMerge, awaitSegmentAvailabilityTimeoutMillis, + strictTierAwareSegmentLoad, numPersistThreads ); @@ -1330,7 +1335,28 @@ public IndexTuningConfig( private IndexTuningConfig() { - this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); + this( + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); } private IndexTuningConfig( @@ -1352,6 +1378,7 @@ private IndexTuningConfig( @Nullable Integer maxSavedParseExceptions, @Nullable Integer maxColumnsToMerge, @Nullable Long awaitSegmentAvailabilityTimeoutMillis, + @Nullable Boolean strictTierAwareSegmentLoad, @Nullable Integer numPersistThreads ) { @@ -1398,6 +1425,9 @@ private IndexTuningConfig( } else { this.awaitSegmentAvailabilityTimeoutMillis = awaitSegmentAvailabilityTimeoutMillis; } + this.strictTierAwareSegmentLoad = strictTierAwareSegmentLoad == null + ? DEFAULT_STRICT_TIER_AWARE_SEGMENT_LOAD + : strictTierAwareSegmentLoad; this.numPersistThreads = numPersistThreads == null ? DEFAULT_NUM_PERSIST_THREADS : Math.max(numPersistThreads, DEFAULT_NUM_PERSIST_THREADS); } @@ -1424,6 +1454,7 @@ public IndexTuningConfig withBasePersistDirectory(File dir) maxSavedParseExceptions, maxColumnsToMerge, awaitSegmentAvailabilityTimeoutMillis, + strictTierAwareSegmentLoad, numPersistThreads ); } @@ -1613,6 +1644,12 @@ public long getAwaitSegmentAvailabilityTimeoutMillis() return awaitSegmentAvailabilityTimeoutMillis; } + @JsonProperty + public boolean isStrictTierAwareSegmentLoad() + { + return strictTierAwareSegmentLoad; + } + @JsonProperty @Override public int getNumPersistThreads() @@ -1648,7 +1685,8 @@ public boolean equals(Object o) Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && Objects.equals(basePersistDirectory, that.basePersistDirectory) && Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) && - Objects.equals(awaitSegmentAvailabilityTimeoutMillis, that.awaitSegmentAvailabilityTimeoutMillis); + awaitSegmentAvailabilityTimeoutMillis == that.awaitSegmentAvailabilityTimeoutMillis && + strictTierAwareSegmentLoad == that.strictTierAwareSegmentLoad; } @Override @@ -1673,6 +1711,7 @@ public int hashCode() maxSavedParseExceptions, segmentWriteOutMediumFactory, awaitSegmentAvailabilityTimeoutMillis, + strictTierAwareSegmentLoad, numPersistThreads ); } @@ -1698,6 +1737,7 @@ public String toString() ", maxSavedParseExceptions=" + maxSavedParseExceptions + ", segmentWriteOutMediumFactory=" + segmentWriteOutMediumFactory + ", awaitSegmentAvailabilityTimeoutMillis=" + awaitSegmentAvailabilityTimeoutMillis + + ", strictTierAwareSegmentLoad=" + strictTierAwareSegmentLoad + ", numPersistThreads=" + numPersistThreads + '}'; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 44147e242955..27d4c846eec4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -178,6 +178,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask private final boolean missingIntervalsInOverwriteMode; private final long awaitSegmentAvailabilityTimeoutMillis; + private final boolean strictTierAwareSegmentLoad; @MonotonicNonNull private AuthorizerMapper authorizerMapper; @@ -266,6 +267,7 @@ public ParallelIndexSupervisorTask( } awaitSegmentAvailabilityTimeoutMillis = ingestionSchema.getTuningConfig().getAwaitSegmentAvailabilityTimeoutMillis(); + strictTierAwareSegmentLoad = ingestionSchema.getTuningConfig().isStrictTierAwareSegmentLoad(); this.ingestionState = IngestionState.NOT_STARTED; this.isCompactionTask = isCompactionTask; } @@ -633,7 +635,8 @@ private void waitForSegmentAvailability(Map report waitForSegmentAvailability( toolbox, segmentsToWaitFor, - awaitSegmentAvailabilityTimeoutMillis + awaitSegmentAvailabilityTimeoutMillis, + strictTierAwareSegmentLoad ); } @@ -1344,6 +1347,7 @@ private static IndexTuningConfig convertToIndexTuningConfig(ParallelIndexTuningC tuningConfig.getMaxSavedParseExceptions(), tuningConfig.getMaxColumnsToMerge(), tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis(), + tuningConfig.isStrictTierAwareSegmentLoad(), tuningConfig.getNumPersistThreads() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java index d97d9beff343..58dc1a93946a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java @@ -108,6 +108,7 @@ public static ParallelIndexTuningConfig defaultConfig() null, null, null, + null, null ); } @@ -144,6 +145,7 @@ public ParallelIndexTuningConfig( @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge, @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis, + @JsonProperty("strictTierAwareSegmentLoad") @Nullable Boolean strictTierAwareSegmentLoad, @JsonProperty("maxAllowedLockCount") @Nullable Integer maxAllowedLockCount, @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads ) @@ -173,6 +175,7 @@ public ParallelIndexTuningConfig( maxSavedParseExceptions, maxColumnsToMerge, awaitSegmentAvailabilityTimeoutMillis, + strictTierAwareSegmentLoad, numPersistThreads ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index 816cfc1574dd..8d8c39beb660 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -291,7 +291,8 @@ public StreamAppenderatorDriver newDriver( new ActionBasedPublishedSegmentRetriever(toolbox.getTaskActionClient()), toolbox.getDataSegmentKiller(), toolbox.getJsonMapper(), - metrics + metrics, + tuningConfig.isStrictTierAwareSegmentLoad() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java index 6dda59421198..0e0b790aa23c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java @@ -42,6 +42,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi private static final Period DEFAULT_INTERMEDIATE_PERSIST_PERIOD = new Period("PT10M"); private static final Boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = Boolean.FALSE; private static final boolean DEFAULT_RELEASE_LOCKS_ON_HANDOFF = false; + private static final boolean DEFAULT_STRICT_TIER_AWARE_SEGMENT_LOAD = false; private static final long DEFAULT_HANDOFF_CONDITION_TIMEOUT = Duration.ofMinutes(15).toMillis(); private final AppendableIndexSpec appendableIndexSpec; @@ -70,6 +71,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi private final int numPersistThreads; private final int maxColumnsToMerge; private final boolean releaseLocksOnHandoff; + private final boolean strictTierAwareSegmentLoad; public SeekableStreamIndexTaskTuningConfig( @Nullable AppendableIndexSpec appendableIndexSpec, @@ -94,7 +96,8 @@ public SeekableStreamIndexTaskTuningConfig( @Nullable Integer maxSavedParseExceptions, @Nullable Integer numPersistThreads, @Nullable Integer maxColumnsToMerge, - @Nullable Boolean releaseLocksOnHandoff + @Nullable Boolean releaseLocksOnHandoff, + @Nullable Boolean strictTierAwareSegmentLoad ) { this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec; @@ -150,6 +153,10 @@ public SeekableStreamIndexTaskTuningConfig( } this.maxColumnsToMerge = maxColumnsToMerge == null ? DEFAULT_MAX_COLUMNS_TO_MERGE : maxColumnsToMerge; this.releaseLocksOnHandoff = Configs.valueOrDefault(releaseLocksOnHandoff, DEFAULT_RELEASE_LOCKS_ON_HANDOFF); + this.strictTierAwareSegmentLoad = Configs.valueOrDefault( + strictTierAwareSegmentLoad, + DEFAULT_STRICT_TIER_AWARE_SEGMENT_LOAD + ); } @Override @@ -303,6 +310,12 @@ public boolean isReleaseLocksOnHandoff() return releaseLocksOnHandoff; } + @JsonProperty + public boolean isStrictTierAwareSegmentLoad() + { + return strictTierAwareSegmentLoad; + } + public abstract SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir); @Override @@ -330,6 +343,7 @@ public boolean equals(Object o) numPersistThreads == that.numPersistThreads && maxColumnsToMerge == that.maxColumnsToMerge && releaseLocksOnHandoff == that.releaseLocksOnHandoff && + strictTierAwareSegmentLoad == that.strictTierAwareSegmentLoad && Objects.equals(partitionsSpec, that.partitionsSpec) && Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) && Objects.equals(basePersistDirectory, that.basePersistDirectory) && @@ -364,7 +378,8 @@ public int hashCode() maxSavedParseExceptions, numPersistThreads, maxColumnsToMerge, - releaseLocksOnHandoff + releaseLocksOnHandoff, + strictTierAwareSegmentLoad ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java index 493f6b477565..5997bdc5f0ab 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java @@ -87,6 +87,7 @@ public void testSerdeTuningConfigWithDeprecatedDynamicPartitionsSpec() throws IO .withForceGuaranteedRollup(false) .withPushTimeout(100L) .withAwaitSegmentAvailabilityTimeoutMillis(1L) + .withStrictTierAwareSegmentLoad(true) .build(); assertSerdeTuningConfig(tuningConfig); } @@ -124,6 +125,7 @@ public void testSerdeTuningConfigWithDeprecatedHashedPartitionsSpec() throws IOE 100, 1234, null, + null, null ); assertSerdeTuningConfig(tuningConfig); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index a2b0b2255537..a141a4da78df 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -1140,6 +1140,58 @@ public void testWaitForSegmentAvailabilityMultipleSegmentsTimeout() EasyMock.verify(mockFactory, mockNotifier); } + @Test + public void testWaitForSegmentAvailabilityStrictTierAwareSegmentLoad() + { + final TaskToolbox mockToolbox = EasyMock.createMock(TaskToolbox.class); + final SegmentHandoffNotifierFactory mockFactory = EasyMock.createMock(SegmentHandoffNotifierFactory.class); + final SegmentHandoffNotifier mockNotifier = EasyMock.createMock(SegmentHandoffNotifier.class); + + final DataSegment mockDataSegment = EasyMock.createMock(DataSegment.class); + final List segmentsToWaitFor = new ArrayList<>(); + segmentsToWaitFor.add(mockDataSegment); + + final IndexTask indexTask = createIndexTask( + createDefaultIngestionSpec( + new UniformGranularitySpec( + Granularities.HOUR, + Granularities.MINUTE, + null + ), + createTuningConfigWithMaxRowsPerSegment(2, true), + false, + false + ), + null + ); + + EasyMock.expect(mockDataSegment.getInterval()).andReturn(Intervals.of("1970-01-01/2100-01-01")).once(); + EasyMock.expect(mockDataSegment.getVersion()).andReturn("dummyString").once(); + EasyMock.expect(mockDataSegment.getShardSpec()).andReturn(EasyMock.createMock(ShardSpec.class)).once(); + + EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory()).andReturn(mockFactory).once(); + EasyMock.expect(mockToolbox.getEmitter()).andReturn(new NoopServiceEmitter()).anyTimes(); + EasyMock.expect(mockDataSegment.getDataSource()).andReturn("MockDataSource").once(); + EasyMock.expect(mockFactory.createSegmentHandoffNotifier("MockDataSource", indexTask.getId(), true)) + .andReturn(mockNotifier) + .once(); + mockNotifier.start(); + EasyMock.expectLastCall().once(); + mockNotifier.registerSegmentHandoffCallback(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().andReturn(true).once(); + mockNotifier.close(); + EasyMock.expectLastCall().once(); + + EasyMock.replay(mockToolbox); + EasyMock.replay(mockDataSegment); + EasyMock.replay(mockFactory, mockNotifier); + + Assert.assertFalse(indexTask.waitForSegmentAvailability(mockToolbox, segmentsToWaitFor, 0, true)); + EasyMock.verify(mockToolbox); + EasyMock.verify(mockDataSegment); + EasyMock.verify(mockFactory, mockNotifier); + } + @Test public void testWaitForSegmentAvailabilityMultipleSegmentsSuccess() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TuningConfigBuilder.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TuningConfigBuilder.java index abac64de8bea..8cd0cd131175 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TuningConfigBuilder.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TuningConfigBuilder.java @@ -69,6 +69,7 @@ public abstract class TuningConfigBuilder createNextPendingRequests( * Queries the coordinator to check if a list of segments has been handed off. * Returns a list of segments which have been handed off. *
- * See {@link org.apache.druid.server.http.DataSourcesResource#isHandOffComplete(String, String, int, String)} + * See {@link org.apache.druid.server.http.DataSourcesResource#isHandOffComplete(String, String, int, String, String)} */ private List checkSegmentHandoff(List segmentDescriptors) { diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java index fb23fadf4894..8f48b6af9926 100644 --- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java +++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java @@ -48,6 +48,24 @@ public interface CoordinatorClient */ ListenableFuture isHandoffComplete(String dataSource, SegmentDescriptor descriptor); + /** + * Checks if the given segment is handed off or not, optionally requiring the + * segment to be loaded on every tier with a positive replica count in the matching retention rule. + */ + default ListenableFuture isHandoffComplete( + final String dataSource, + final SegmentDescriptor descriptor, + final boolean strictTierAwareSegmentLoad + ) + { + if (strictTierAwareSegmentLoad) { + throw new UnsupportedOperationException( + "Strict tier-aware segment load handoff checks are not supported by this CoordinatorClient implementation." + ); + } + return isHandoffComplete(dataSource, descriptor); + } + /** * Fetches segment metadata for the given dataSource and segmentId. If includeUnused is set to false, the segment is * not returned if it is marked as unused. diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java index ab0545bf3455..84e2b8d8a3b0 100644 --- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java +++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java @@ -79,14 +79,25 @@ public CoordinatorClientImpl( } @Override - public ListenableFuture isHandoffComplete(String dataSource, SegmentDescriptor descriptor) + public ListenableFuture isHandoffComplete(final String dataSource, final SegmentDescriptor descriptor) + { + return isHandoffComplete(dataSource, descriptor, false); + } + + @Override + public ListenableFuture isHandoffComplete( + final String dataSource, + final SegmentDescriptor descriptor, + final boolean strictTierAwareSegmentLoad + ) { final String path = StringUtils.format( - "/druid/coordinator/v1/datasources/%s/handoffComplete?interval=%s&partitionNumber=%d&version=%s", + "/druid/coordinator/v1/datasources/%s/handoffComplete?interval=%s&partitionNumber=%d&version=%s%s", StringUtils.urlEncode(dataSource), StringUtils.urlEncode(descriptor.getInterval().toString()), descriptor.getPartitionNumber(), - StringUtils.urlEncode(descriptor.getVersion()) + StringUtils.urlEncode(descriptor.getVersion()), + strictTierAwareSegmentLoad ? "&strictTierAwareSegmentLoad=true" : "" ); return FutureUtils.transform( diff --git a/server/src/main/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java index b8aa55ec1e4a..aa63fee97def 100644 --- a/server/src/main/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java +++ b/server/src/main/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java @@ -44,7 +44,17 @@ public class NoopCoordinatorClient implements CoordinatorClient { @Override - public ListenableFuture isHandoffComplete(String dataSource, SegmentDescriptor descriptor) + public ListenableFuture isHandoffComplete(final String dataSource, final SegmentDescriptor descriptor) + { + throw new UnsupportedOperationException(); + } + + @Override + public ListenableFuture isHandoffComplete( + final String dataSource, + final SegmentDescriptor descriptor, + final boolean strictTierAwareSegmentLoad + ) { throw new UnsupportedOperationException(); } diff --git a/server/src/main/java/org/apache/druid/segment/handoff/CoordinatorBasedSegmentHandoffNotifier.java b/server/src/main/java/org/apache/druid/segment/handoff/CoordinatorBasedSegmentHandoffNotifier.java index 3709c49dbc94..3df7c0378a49 100644 --- a/server/src/main/java/org/apache/druid/segment/handoff/CoordinatorBasedSegmentHandoffNotifier.java +++ b/server/src/main/java/org/apache/druid/segment/handoff/CoordinatorBasedSegmentHandoffNotifier.java @@ -45,6 +45,7 @@ public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNot private final Duration pollDuration; private final String dataSource; private final String taskId; + private final boolean strictTierAwareSegmentLoad; public CoordinatorBasedSegmentHandoffNotifier( String dataSource, @@ -52,11 +53,23 @@ public CoordinatorBasedSegmentHandoffNotifier( CoordinatorBasedSegmentHandoffNotifierConfig config, String taskId ) + { + this(dataSource, coordinatorClient, config, taskId, false); + } + + public CoordinatorBasedSegmentHandoffNotifier( + String dataSource, + CoordinatorClient coordinatorClient, + CoordinatorBasedSegmentHandoffNotifierConfig config, + String taskId, + boolean strictTierAwareSegmentLoad + ) { this.dataSource = dataSource; this.coordinatorClient = coordinatorClient; this.pollDuration = config.getPollDuration(); this.taskId = taskId; + this.strictTierAwareSegmentLoad = strictTierAwareSegmentLoad; } @Override @@ -92,7 +105,12 @@ void checkForSegmentHandoffs() SegmentDescriptor descriptor = entry.getKey(); try { Boolean handOffComplete = - FutureUtils.getUnchecked(coordinatorClient.isHandoffComplete(dataSource, descriptor), true); + FutureUtils.getUnchecked( + strictTierAwareSegmentLoad + ? coordinatorClient.isHandoffComplete(dataSource, descriptor, true) + : coordinatorClient.isHandoffComplete(dataSource, descriptor), + true + ); if (Boolean.TRUE.equals(handOffComplete)) { log.debug("Segment handoff complete for dataSource[%s] segment[%s] for task[%s]", dataSource, descriptor, taskId); entry.getValue().lhs.execute(entry.getValue().rhs); diff --git a/server/src/main/java/org/apache/druid/segment/handoff/CoordinatorBasedSegmentHandoffNotifierFactory.java b/server/src/main/java/org/apache/druid/segment/handoff/CoordinatorBasedSegmentHandoffNotifierFactory.java index 447831da9786..5eab70a9893a 100644 --- a/server/src/main/java/org/apache/druid/segment/handoff/CoordinatorBasedSegmentHandoffNotifierFactory.java +++ b/server/src/main/java/org/apache/druid/segment/handoff/CoordinatorBasedSegmentHandoffNotifierFactory.java @@ -40,12 +40,23 @@ public CoordinatorBasedSegmentHandoffNotifierFactory( @Override public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource, String taskId) + { + return createSegmentHandoffNotifier(dataSource, taskId, false); + } + + @Override + public SegmentHandoffNotifier createSegmentHandoffNotifier( + String dataSource, + String taskId, + boolean strictTierAwareSegmentLoad + ) { return new CoordinatorBasedSegmentHandoffNotifier( dataSource, client, config, - taskId + taskId, + strictTierAwareSegmentLoad ); } } diff --git a/server/src/main/java/org/apache/druid/segment/handoff/NoopSegmentHandoffNotifierFactory.java b/server/src/main/java/org/apache/druid/segment/handoff/NoopSegmentHandoffNotifierFactory.java index bae6a4a1906a..f2515930e073 100644 --- a/server/src/main/java/org/apache/druid/segment/handoff/NoopSegmentHandoffNotifierFactory.java +++ b/server/src/main/java/org/apache/druid/segment/handoff/NoopSegmentHandoffNotifierFactory.java @@ -54,4 +54,14 @@ public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource, St { return NOTIFIER; } + + @Override + public SegmentHandoffNotifier createSegmentHandoffNotifier( + String dataSource, + String taskId, + boolean strictTierAwareSegmentLoad + ) + { + return NOTIFIER; + } } diff --git a/server/src/main/java/org/apache/druid/segment/handoff/SegmentHandoffNotifierFactory.java b/server/src/main/java/org/apache/druid/segment/handoff/SegmentHandoffNotifierFactory.java index ad9f0c5ecf09..58631c46d273 100644 --- a/server/src/main/java/org/apache/druid/segment/handoff/SegmentHandoffNotifierFactory.java +++ b/server/src/main/java/org/apache/druid/segment/handoff/SegmentHandoffNotifierFactory.java @@ -23,4 +23,19 @@ public interface SegmentHandoffNotifierFactory { SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource, String taskId); + + default SegmentHandoffNotifier createSegmentHandoffNotifier( + final String dataSource, + final String taskId, + final boolean strictTierAwareSegmentLoad + ) + { + if (strictTierAwareSegmentLoad) { + throw new UnsupportedOperationException( + "Strict tier-aware segment load handoff checks are not supported by this " + + "SegmentHandoffNotifierFactory implementation." + ); + } + return createSegmentHandoffNotifier(dataSource, taskId); + } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index 53b0fcbba3b9..2bb033e43f44 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -104,11 +104,46 @@ public StreamAppenderatorDriver( ObjectMapper objectMapper, SegmentGenerationMetrics metrics ) + { + this( + appenderator, + segmentAllocator, + handoffNotifierFactory, + segmentRetriever, + dataSegmentKiller, + objectMapper, + metrics, + false + ); + } + + public StreamAppenderatorDriver( + Appenderator appenderator, + SegmentAllocator segmentAllocator, + SegmentHandoffNotifierFactory handoffNotifierFactory, + PublishedSegmentRetriever segmentRetriever, + DataSegmentKiller dataSegmentKiller, + ObjectMapper objectMapper, + SegmentGenerationMetrics metrics, + boolean strictTierAwareSegmentLoad + ) { super(appenderator, segmentAllocator, segmentRetriever, dataSegmentKiller); - this.handoffNotifier = Preconditions.checkNotNull(handoffNotifierFactory, "handoffNotifierFactory") - .createSegmentHandoffNotifier(appenderator.getDataSource(), appenderator.getId()); + final SegmentHandoffNotifierFactory nonNullHandoffNotifierFactory = Preconditions.checkNotNull( + handoffNotifierFactory, + "handoffNotifierFactory" + ); + this.handoffNotifier = strictTierAwareSegmentLoad + ? nonNullHandoffNotifierFactory.createSegmentHandoffNotifier( + appenderator.getDataSource(), + appenderator.getId(), + true + ) + : nonNullHandoffNotifierFactory.createSegmentHandoffNotifier( + appenderator.getDataSource(), + appenderator.getId() + ); this.metrics = Preconditions.checkNotNull(metrics, "metrics"); this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper"); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 72f50a87a570..b0ebf2647a21 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -238,6 +238,11 @@ public Map> getTierToDatasourceToUnderReplicatedC return computeUnderReplicated(dataSegments, useClusterView); } + public CoordinatorDynamicConfig getCurrentDynamicConfig() + { + return metadataManager.configs().getCurrentDynamicConfig(); + } + public Map> getTierToDatasourceToUnderReplicatedCount( Iterable dataSegments, boolean useClusterView @@ -248,16 +253,21 @@ public Map> getTierToDatasourceToUnderReplicatedC public Object2IntMap getDatasourceToUnavailableSegmentCount() { - if (segmentReplicationStatus == null) { + return getDatasourceToUnavailableSegmentCount(false); + } + + public Object2IntMap getDatasourceToUnavailableSegmentCount(final boolean strictTierAwareSegmentLoad) + { + final SegmentReplicationStatus replicationStatus = segmentReplicationStatus; + if (replicationStatus == null) { return Object2IntMaps.emptyMap(); } final Object2IntOpenHashMap datasourceToUnavailableSegments = new Object2IntOpenHashMap<>(); final Iterable dataSegments = metadataManager.iterateAllUsedSegments(); - for (DataSegment segment : dataSegments) { - SegmentReplicaCount replicaCount = segmentReplicationStatus.getReplicaCountsInCluster(segment.getId()); - if (replicaCount != null && (replicaCount.totalLoaded() > 0 || replicaCount.required() == 0)) { + for (final DataSegment segment : dataSegments) { + if (replicationStatus.isSegmentAvailable(segment.getId(), strictTierAwareSegmentLoad)) { datasourceToUnavailableSegments.addTo(segment.getDataSource(), 0); } else { datasourceToUnavailableSegments.addTo(segment.getDataSource(), 1); @@ -287,26 +297,48 @@ public Object2IntMap getDatasourceToDeepStorageQueryOnlySegmentCount() } public Map getDatasourceToLoadStatus() + { + return getDatasourceToLoadStatus(false); + } + + public Map getDatasourceToLoadStatus(final boolean strictTierAwareSegmentLoad) { final Map loadStatus = new HashMap<>(); final DataSourcesSnapshot snapshot = metadataManager.segments().getRecentDataSourcesSnapshot(); + final SegmentReplicationStatus replicationStatus = segmentReplicationStatus; - for (ImmutableDruidDataSource dataSource : snapshot.getDataSourcesWithAllUsedSegments()) { + if (strictTierAwareSegmentLoad && replicationStatus == null) { + return Collections.emptyMap(); + } + + for (final ImmutableDruidDataSource dataSource : snapshot.getDataSourcesWithAllUsedSegments()) { final Set segments = Sets.newHashSet(dataSource.getSegments()); final int numPublishedSegments = segments.size(); - - // remove loaded segments - for (DruidServer druidServer : serverInventoryView.getInventory()) { - final DruidDataSource loadedView = druidServer.getDataSource(dataSource.getName()); - if (loadedView != null) { - // This does not use segments.removeAll(loadedView.getSegments()) for performance reasons. - // Please see https://github.com/apache/druid/pull/5632 for more info. - for (DataSegment serverSegment : loadedView.getSegments()) { - segments.remove(serverSegment); + final int numUnavailableSegments; + + if (strictTierAwareSegmentLoad) { + numUnavailableSegments = (int) segments.stream() + .filter( + segment -> !replicationStatus.isSegmentAvailable( + segment.getId(), + true + ) + ) + .count(); + } else { + // remove loaded segments + for (final DruidServer druidServer : serverInventoryView.getInventory()) { + final DruidDataSource loadedView = druidServer.getDataSource(dataSource.getName()); + if (loadedView != null) { + // This does not use segments.removeAll(loadedView.getSegments()) for performance reasons. + // Please see https://github.com/apache/druid/pull/5632 for more info. + for (final DataSegment serverSegment : loadedView.getSegments()) { + segments.remove(serverSegment); + } } } + numUnavailableSegments = segments.size(); } - final int numUnavailableSegments = segments.size(); loadStatus.put( dataSource.getName(), (numPublishedSegments - numUnavailableSegments) * 100.0 / numPublishedSegments @@ -316,6 +348,21 @@ public Map getDatasourceToLoadStatus() return loadStatus; } + public boolean isSegmentAvailable(DataSegment segment) + { + return isSegmentAvailable(segment, false); + } + + public boolean isSegmentAvailable(final DataSegment segment, final boolean strictTierAwareSegmentLoad) + { + final SegmentReplicationStatus replicationStatus = segmentReplicationStatus; + return replicationStatus != null + && replicationStatus.isSegmentAvailable( + segment.getId(), + strictTierAwareSegmentLoad + ); + } + /** * @return Set of broadcast segments determined by the latest run of the {@link RunRules} duty. * If the coordinator runs haven't triggered or are delayed, this information may be stale. diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCount.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCount.java index 1f9650a59f07..27ccb7415b8e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCount.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicaCount.java @@ -136,6 +136,11 @@ public int totalLoaded() return loaded + loadedNonHistorical; } + int loadedOnHistoricals() + { + return loaded; + } + /** * Number of replicas which are safely loaded on historical servers and are * not being dropped. diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicationStatus.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicationStatus.java index 7121642f25ed..8a0dab350aa1 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicationStatus.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentReplicationStatus.java @@ -56,6 +56,34 @@ public SegmentReplicaCount getReplicaCountsInCluster(SegmentId segmentId) return totalReplicaCounts.get(segmentId); } + public boolean isSegmentAvailable(final SegmentId segmentId, final boolean strictTierAwareSegmentLoad) + { + final SegmentReplicaCount countsInCluster = totalReplicaCounts.get(segmentId); + if (countsInCluster == null) { + return false; + } + + if (countsInCluster.required() == 0) { + return true; + } + + if (!strictTierAwareSegmentLoad) { + return countsInCluster.totalLoaded() > 0; + } + + final Map tierToReplicaCount = replicaCountsInTier.get(segmentId); + if (tierToReplicaCount == null) { + return false; + } + + for (final SegmentReplicaCount countsInTier : tierToReplicaCount.values()) { + if (countsInTier.required() > 0 && countsInTier.loadedOnHistoricals() == 0) { + return false; + } + } + return true; + } + public Map> getTierToDatasourceToUnderReplicated( Iterable usedSegments, boolean ignoreMissingServers diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java index 1f61464bafbf..598c6d4ce5fd 100644 --- a/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java @@ -83,11 +83,15 @@ public Response isLeader() public Response getLoadStatus( @QueryParam("simple") String simple, @QueryParam("full") String full, - @QueryParam("computeUsingClusterView") @Nullable String computeUsingClusterView + @QueryParam("computeUsingClusterView") @Nullable String computeUsingClusterView, + @QueryParam("strictTierAwareSegmentLoad") @Nullable final String strictTierAwareSegmentLoad ) { + final boolean strictTierAwareLoad = isQueryParamEnabled(strictTierAwareSegmentLoad); if (simple != null) { - return Response.ok(coordinator.getDatasourceToUnavailableSegmentCount()).build(); + return Response.ok( + coordinator.getDatasourceToUnavailableSegmentCount(strictTierAwareLoad) + ).build(); } if (full != null) { @@ -95,7 +99,12 @@ public Response getLoadStatus( coordinator.getTierToDatasourceToUnderReplicatedCount(computeUsingClusterView != null) ).build(); } - return Response.ok(coordinator.getDatasourceToLoadStatus()).build(); + return Response.ok(coordinator.getDatasourceToLoadStatus(strictTierAwareLoad)).build(); + } + + private static boolean isQueryParamEnabled(@Nullable String queryParam) + { + return queryParam != null && (queryParam.isEmpty() || Boolean.parseBoolean(queryParam)); } @GET diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 31281842923f..57776007e1c2 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -433,7 +433,8 @@ public Response getDatasourceLoadstatus( @QueryParam("interval") @Nullable final String interval, @QueryParam("simple") @Nullable final String simple, @QueryParam("full") @Nullable final String full, - @QueryParam("computeUsingClusterView") @Nullable String computeUsingClusterView + @QueryParam("computeUsingClusterView") @Nullable String computeUsingClusterView, + @QueryParam("strictTierAwareSegmentLoad") @Nullable final String strictTierAwareSegmentLoad ) { if (forceMetadataRefresh == null) { @@ -470,9 +471,11 @@ public Response getDatasourceLoadstatus( .build(); } + final boolean strictTierAwareLoad = isQueryParamEnabled(strictTierAwareSegmentLoad); if (simple != null) { // Calculate response for simple mode - SegmentsLoadStatistics segmentsLoadStatistics = computeSegmentLoadStatistics(segments); + SegmentsLoadStatistics segmentsLoadStatistics = + computeSegmentLoadStatistics(segments, strictTierAwareLoad); return Response.ok( ImmutableMap.of( dataSourceName, @@ -491,7 +494,8 @@ public Response getDatasourceLoadstatus( return Response.ok(segmentLoadMap).build(); } else { // Calculate response for default mode - SegmentsLoadStatistics segmentsLoadStatistics = computeSegmentLoadStatistics(segments); + SegmentsLoadStatistics segmentsLoadStatistics = + computeSegmentLoadStatistics(segments, strictTierAwareLoad); return Response.ok( ImmutableMap.of( dataSourceName, @@ -502,13 +506,20 @@ public Response getDatasourceLoadstatus( } } - private SegmentsLoadStatistics computeSegmentLoadStatistics(Iterable segments) + private SegmentsLoadStatistics computeSegmentLoadStatistics( + Iterable segments, + final boolean strictTierAwareSegmentLoad + ) { - Map segmentLoadInfos = serverInventoryView.getLoadInfoForAllSegments(); + if (strictTierAwareSegmentLoad) { + return computeSegmentLoadStatisticsFromCoordinator(segments); + } + + final Map segmentLoadInfos = serverInventoryView.getLoadInfoForAllSegments(); int numPublishedSegments = 0; int numUnavailableSegments = 0; int numLoadedSegments = 0; - for (DataSegment segment : segments) { + for (final DataSegment segment : segments) { numPublishedSegments++; if (!segmentLoadInfos.containsKey(segment.getId())) { numUnavailableSegments++; @@ -519,6 +530,22 @@ private SegmentsLoadStatistics computeSegmentLoadStatistics(Iterable segments) + { + int numPublishedSegments = 0; + int numUnavailableSegments = 0; + int numLoadedSegments = 0; + for (final DataSegment segment : segments) { + numPublishedSegments++; + if (coordinator != null && coordinator.isSegmentAvailable(segment, true)) { + numLoadedSegments++; + } else { + numUnavailableSegments++; + } + } + return new SegmentsLoadStatistics(numPublishedSegments, numUnavailableSegments, numLoadedSegments); + } + private static class SegmentsLoadStatistics { private final int numPublishedSegments; @@ -919,11 +946,13 @@ public Response isHandOffComplete( @PathParam("dataSourceName") String dataSourceName, @QueryParam("interval") final String interval, @QueryParam("partitionNumber") final int partitionNumber, - @QueryParam("version") final String version + @QueryParam("version") final String version, + @QueryParam("strictTierAwareSegmentLoad") @Nullable final String strictTierAwareSegmentLoad ) { try { final List rules = metadataRuleManager.getRulesWithDefault(dataSourceName); + final boolean strictTierAwareLoad = isQueryParamEnabled(strictTierAwareSegmentLoad); final Interval theInterval = Intervals.of(interval); final SegmentDescriptor descriptor = new SegmentDescriptor(theInterval, version, partitionNumber); final DateTime now = DateTimes.nowUtc(); @@ -941,7 +970,8 @@ public Response isHandOffComplete( // A segment that is not eligible for load will never be handed off boolean eligibleForLoad = false; - for (Rule rule : rules) { + Set requiredLoadTiers = Set.of(); + for (final Rule rule : rules) { final boolean applies; if (rule.isIntervalBased()) { applies = rule.appliesTo(theInterval, now); @@ -954,7 +984,13 @@ public Response isHandOffComplete( applies = rule.appliesTo(segment, now); } if (applies) { - eligibleForLoad = rule instanceof LoadRule && ((LoadRule) rule).shouldMatchingSegmentBeLoaded(); + if (rule instanceof LoadRule) { + final LoadRule loadRule = (LoadRule) rule; + eligibleForLoad = loadRule.shouldMatchingSegmentBeLoaded(); + if (eligibleForLoad && strictTierAwareLoad) { + requiredLoadTiers = getRequiredLoadTiers(loadRule, getHistoricalTierAliases()); + } + } break; } } @@ -978,9 +1014,16 @@ public Response isHandOffComplete( return Response.ok(true).build(); } - Iterable servedSegmentsInInterval = + final Iterable servedSegmentsInInterval = prepareServedSegmentsInInterval(timeline, theInterval); - if (isSegmentLoaded(servedSegmentsInInterval, descriptor)) { + final boolean segmentLoaded = strictTierAwareLoad + ? isSegmentLoadedOnRequiredTiers( + servedSegmentsInInterval, + descriptor, + requiredLoadTiers + ) + : isSegmentLoaded(servedSegmentsInInterval, descriptor); + if (segmentLoaded) { return Response.ok(true).build(); } @@ -992,6 +1035,51 @@ public Response isHandOffComplete( } } + private Map> getHistoricalTierAliases() + { + return coordinator == null ? Map.of() : coordinator.getCurrentDynamicConfig().getHistoricalTierAliases(); + } + + private static Set getRequiredLoadTiers( + LoadRule loadRule, + Map> historicalTierAliases + ) + { + final Map expandedTieredReplicants = + expandTieredReplicantsWithAliases(loadRule.getTieredReplicants(), historicalTierAliases); + return expandedTieredReplicants.entrySet() + .stream() + .filter(entry -> entry.getValue() > 0) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + } + + private static Map expandTieredReplicantsWithAliases( + Map tieredReplicants, + Map> historicalTierAliases + ) + { + if (historicalTierAliases.isEmpty()) { + return tieredReplicants; + } + + final Map expanded = new HashMap<>(); + tieredReplicants.forEach((tier, replicaCount) -> { + final Set aliasTiers = historicalTierAliases.get(tier); + if (aliasTiers == null) { + expanded.put(tier, replicaCount); + } else { + aliasTiers.forEach(alias -> expanded.putIfAbsent(alias, replicaCount)); + } + }); + return expanded; + } + + private static boolean isQueryParamEnabled(@Nullable String queryParam) + { + return queryParam != null && (queryParam.isEmpty() || Boolean.parseBoolean(queryParam)); + } + @Nullable private static DataSegment lookupSegment(@Nullable DataSourcesSnapshot snapshot, SegmentId segmentId) { @@ -1004,17 +1092,44 @@ private static DataSegment lookupSegment(@Nullable DataSourcesSnapshot snapshot, static boolean isSegmentLoaded(Iterable servedSegments, SegmentDescriptor descriptor) { - for (ImmutableSegmentLoadInfo segmentLoadInfo : servedSegments) { - if (segmentLoadInfo.getSegment().getInterval().contains(descriptor.getInterval()) - && segmentLoadInfo.getSegment().getShardSpec().getPartitionNum() == descriptor.getPartitionNumber() - && segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0 - && Iterables.any( - segmentLoadInfo.getServers(), DruidServerMetadata::isSegmentReplicationTarget - )) { + for (final ImmutableSegmentLoadInfo segmentLoadInfo : servedSegments) { + if (matchesSegmentDescriptor(segmentLoadInfo, descriptor) + && Iterables.any(segmentLoadInfo.getServers(), DruidServerMetadata::isSegmentReplicationTarget)) { return true; } } return false; } + static boolean isSegmentLoadedOnRequiredTiers( + Iterable servedSegments, + SegmentDescriptor descriptor, + Set requiredTiers + ) + { + final Set loadedTiers = new HashSet<>(); + for (final ImmutableSegmentLoadInfo segmentLoadInfo : servedSegments) { + if (!matchesSegmentDescriptor(segmentLoadInfo, descriptor)) { + continue; + } + + segmentLoadInfo.getServers() + .stream() + .filter(DruidServerMetadata::isSegmentReplicationTarget) + .map(DruidServerMetadata::getTier) + .forEach(loadedTiers::add); + } + return loadedTiers.containsAll(requiredTiers); + } + + private static boolean matchesSegmentDescriptor( + ImmutableSegmentLoadInfo segmentLoadInfo, + SegmentDescriptor descriptor + ) + { + return segmentLoadInfo.getSegment().getInterval().contains(descriptor.getInterval()) + && segmentLoadInfo.getSegment().getShardSpec().getPartitionNum() == descriptor.getPartitionNumber() + && segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0; + } + } diff --git a/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java b/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java index 0b39eb150160..57dad4746fca 100644 --- a/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java +++ b/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java @@ -163,6 +163,33 @@ public void test_isHandoffComplete() throws Exception ); } + @Test + public void test_isHandoffCompleteWithStrictTierAwareSegmentLoad() throws Exception + { + serviceClient.expectAndRespond( + new RequestBuilder( + HttpMethod.GET, + "/druid/coordinator/v1/datasources/xyz/handoffComplete?" + + "interval=2000-01-01T00%3A00%3A00.000Z%2F3000-01-01T00%3A00%3A00.000Z&" + + "partitionNumber=2&" + + "version=1&" + + "strictTierAwareSegmentLoad=true" + ), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), + StringUtils.toUtf8("true") + ); + + Assert.assertEquals( + true, + coordinatorClient.isHandoffComplete( + "xyz", + new SegmentDescriptor(Intervals.of("2000/3000"), "1", 2), + true + ).get() + ); + } + @Test public void test_fetchUsedSegment() throws Exception { diff --git a/server/src/test/java/org/apache/druid/segment/handoff/CoordinatorBasedSegmentHandoffNotifierTest.java b/server/src/test/java/org/apache/druid/segment/handoff/CoordinatorBasedSegmentHandoffNotifierTest.java index 86fde003cfae..c778f7173d91 100644 --- a/server/src/test/java/org/apache/druid/segment/handoff/CoordinatorBasedSegmentHandoffNotifierTest.java +++ b/server/src/test/java/org/apache/druid/segment/handoff/CoordinatorBasedSegmentHandoffNotifierTest.java @@ -107,4 +107,36 @@ public void testHandoffCallbackCalled() Assert.assertTrue(callbackCalled.get()); EasyMock.verify(coordinatorClient); } + + @Test + public void testStrictTierAwareSegmentLoad() + { + final Interval interval = Intervals.of("2011-04-01/2011-04-02"); + final SegmentDescriptor descriptor = new SegmentDescriptor(interval, "v1", 2); + + final AtomicBoolean callbackCalled = new AtomicBoolean(false); + final CoordinatorClient coordinatorClient = EasyMock.createMock(CoordinatorClient.class); + EasyMock.expect(coordinatorClient.isHandoffComplete("test_ds", descriptor, true)) + .andReturn(Futures.immediateFuture(true)) + .once(); + EasyMock.replay(coordinatorClient); + final CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier( + "test_ds", + coordinatorClient, + notifierConfig, + "test_task", + true + ); + + notifier.registerSegmentHandoffCallback( + descriptor, + Execs.directExecutor(), + () -> callbackCalled.set(true) + ); + notifier.checkForSegmentHandoffs(); + + Assert.assertTrue(notifier.getHandOffCallbacks().isEmpty()); + Assert.assertTrue(callbackCalled.get()); + EasyMock.verify(coordinatorClient); + } } diff --git a/server/src/test/java/org/apache/druid/server/AsyncManagementForwardingServletTest.java b/server/src/test/java/org/apache/druid/server/AsyncManagementForwardingServletTest.java index 5b414d6f3cb0..a53a9c7c5985 100644 --- a/server/src/test/java/org/apache/druid/server/AsyncManagementForwardingServletTest.java +++ b/server/src/test/java/org/apache/druid/server/AsyncManagementForwardingServletTest.java @@ -163,7 +163,7 @@ public void testCoordinatorDatasources() throws Exception public void testCoordinatorLoadStatus() throws Exception { COORDINATOR_EXPECTED_REQUEST.path = "/druid/coordinator/v1/loadstatus"; - COORDINATOR_EXPECTED_REQUEST.query = "full"; + COORDINATOR_EXPECTED_REQUEST.query = "full&strictTierAwareSegmentLoad=true"; COORDINATOR_EXPECTED_REQUEST.method = "GET"; COORDINATOR_EXPECTED_REQUEST.headers = ImmutableMap.of("Authorization", "Basic bXl1c2VyOm15cGFzc3dvcmQ="); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 332e6ffbadac..c9eb9c4b9c75 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -366,6 +366,68 @@ public void testCoordinatorTieredRun() throws Exception EasyMock.verify(metadataRuleManager); } + @Test(timeout = 60_000L) + public void testLoadStatusCanUseStrictTierAwareSegmentLoad() throws Exception + { + final String dataSource = "dataSource", hotTierName = "hot", coldTierName = "cold"; + final DataSegment dataSegment = new DataSegment( + dataSource, + Intervals.of("2018-01-02/P1D"), + "v1", + null, + null, + null, + null, + 0x9, + 0 + ); + final Rule tieredRule = new ForeverLoadRule(ImmutableMap.of(hotTierName, 1, coldTierName, 1), null); + EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString())) + .andReturn(ImmutableList.of(tieredRule)).atLeastOnce(); + metadataRuleManager.stop(); + EasyMock.expectLastCall().once(); + + final DruidDataSource dataSourceWithSegment = new DruidDataSource(dataSource, Collections.emptyMap()); + dataSourceWithSegment.addSegment(dataSegment); + setupSegmentsMetadataMock(dataSourceWithSegment); + + final DruidServer hotServer = new DruidServer("hot", "hot", null, 5L, null, ServerType.HISTORICAL, hotTierName, 0); + hotServer.addDataSegment(dataSegment); + final DruidServer coldServer = + new DruidServer("cold", "cold", null, 5L, null, ServerType.HISTORICAL, coldTierName, 0); + setupPeons(ImmutableMap.of("hot", new TestLoadQueuePeon(), "cold", new TestLoadQueuePeon())); + EasyMock.expect(serverInventoryView.getInventory()) + .andReturn(ImmutableList.of(hotServer, coldServer)) + .atLeastOnce(); + EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes(); + + EasyMock.replay(metadataRuleManager, serverInventoryView, loadQueueTaskMaster); + + coordinator.start(); + leaderAnnouncerLatch.await(); + serviceEmitter.coordinatorRunLatch.await(); + + Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getDatasourceToLoadStatus()); + Assert.assertEquals(ImmutableMap.of(dataSource, 0.0), coordinator.getDatasourceToLoadStatus(true)); + + final Object2IntMap defaultUnavailableUsedSegmentsPerDataSource = + coordinator.getDatasourceToUnavailableSegmentCount(); + Assert.assertEquals(1, defaultUnavailableUsedSegmentsPerDataSource.size()); + Assert.assertEquals(0, defaultUnavailableUsedSegmentsPerDataSource.getInt(dataSource)); + + final Object2IntMap tierAwareUnavailableUsedSegmentsPerDataSource = + coordinator.getDatasourceToUnavailableSegmentCount(true); + Assert.assertEquals(1, tierAwareUnavailableUsedSegmentsPerDataSource.size()); + Assert.assertEquals(1, tierAwareUnavailableUsedSegmentsPerDataSource.getInt(dataSource)); + + coordinator.stop(); + leaderUnannouncerLatch.await(); + + EasyMock.verify(serverInventoryView); + EasyMock.verify(segmentsMetadataManager); + EasyMock.verify(metadataRuleManager); + } + @Test(timeout = 60_000L) public void testComputeUnderReplicationCountsPerDataSourcePerTierForSegmentsWithBroadcastRule() throws Exception { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/SegmentReplicationStatusTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/SegmentReplicationStatusTest.java new file mode 100644 index 000000000000..9e766b2be21a --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/SegmentReplicationStatusTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.loading; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.timeline.SegmentId; +import org.junit.Assert; +import org.junit.Test; + +public class SegmentReplicationStatusTest +{ + @Test + public void testStrictSegmentAvailabilityRequiresHistoricalReplicaInEveryRequiredTier() + { + final SegmentId segmentId = SegmentId.dummy("wiki"); + final SegmentReplicationStatus statusWithNonHistoricalReplica = new SegmentReplicationStatus( + ImmutableMap.of( + segmentId, + ImmutableMap.of( + "hot", requiredHistoricalReplica(), + "cold", requiredNonHistoricalReplica() + ) + ) + ); + + Assert.assertTrue(statusWithNonHistoricalReplica.isSegmentAvailable(segmentId, false)); + Assert.assertFalse(statusWithNonHistoricalReplica.isSegmentAvailable(segmentId, true)); + + final SegmentReplicationStatus statusWithHistoricalReplicas = new SegmentReplicationStatus( + ImmutableMap.of( + segmentId, + ImmutableMap.of( + "hot", requiredHistoricalReplica(), + "cold", requiredHistoricalReplica() + ) + ) + ); + Assert.assertTrue(statusWithHistoricalReplicas.isSegmentAvailable(segmentId, true)); + } + + private static SegmentReplicaCount requiredHistoricalReplica() + { + final SegmentReplicaCount replicaCount = new SegmentReplicaCount(); + replicaCount.setRequired(1, 1); + replicaCount.incrementLoaded(); + return replicaCount; + } + + private static SegmentReplicaCount requiredNonHistoricalReplica() + { + final SegmentReplicaCount replicaCount = new SegmentReplicaCount(); + replicaCount.setRequired(1, 1); + replicaCount.incrementLoadedOnNonHistoricalServer(); + return replicaCount; + } +} diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorResourceTest.java index fe56c5c38d1d..7b157efe9c36 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorResourceTest.java @@ -20,6 +20,7 @@ package org.apache.druid.server.http; import com.google.common.collect.ImmutableMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.duty.DutyGroupStatus; @@ -81,6 +82,53 @@ public void testIsLeader() Assert.assertEquals(404, response2.getStatus()); } + @Test + public void testGetLoadStatusCanUseStrictTierAwareSegmentLoad() + { + EasyMock.expect(mock.getDatasourceToLoadStatus(false)) + .andReturn(ImmutableMap.of("wiki", 100.0)) + .once(); + EasyMock.expect(mock.getDatasourceToLoadStatus(true)) + .andReturn(ImmutableMap.of("wiki", 0.0)) + .times(2); + EasyMock.expect(mock.getDatasourceToLoadStatus(false)) + .andReturn(ImmutableMap.of("wiki", 100.0)) + .once(); + EasyMock.replay(mock); + + final CoordinatorResource resource = new CoordinatorResource(mock); + final Response defaultResponse = resource.getLoadStatus(null, null, null, null); + Assert.assertEquals(ImmutableMap.of("wiki", 100.0), defaultResponse.getEntity()); + Assert.assertEquals(200, defaultResponse.getStatus()); + + final Response tierAwareResponse = resource.getLoadStatus(null, null, null, "true"); + Assert.assertEquals(ImmutableMap.of("wiki", 0.0), tierAwareResponse.getEntity()); + Assert.assertEquals(200, tierAwareResponse.getStatus()); + + final Response bareParamResponse = resource.getLoadStatus(null, null, null, ""); + Assert.assertEquals(ImmutableMap.of("wiki", 0.0), bareParamResponse.getEntity()); + Assert.assertEquals(200, bareParamResponse.getStatus()); + + final Response falseParamResponse = resource.getLoadStatus(null, null, null, "false"); + Assert.assertEquals(ImmutableMap.of("wiki", 100.0), falseParamResponse.getEntity()); + Assert.assertEquals(200, falseParamResponse.getStatus()); + } + + @Test + public void testGetLoadStatusSimpleCanUseStrictTierAwareSegmentLoad() + { + final Object2IntOpenHashMap unavailableCounts = new Object2IntOpenHashMap<>(); + unavailableCounts.put("wiki", 1); + EasyMock.expect(mock.getDatasourceToUnavailableSegmentCount(true)) + .andReturn(unavailableCounts) + .once(); + EasyMock.replay(mock); + + final Response response = new CoordinatorResource(mock).getLoadStatus("true", null, null, "true"); + Assert.assertEquals(unavailableCounts, response.getEntity()); + Assert.assertEquals(200, response.getStatus()); + } + @Test public void testGetLoadStatusSimple() { diff --git a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java index 8b117834d363..c0105f15040e 100644 --- a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java @@ -53,6 +53,7 @@ import org.apache.druid.segment.TestDataSource; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.rules.CannotMatchBehavior; import org.apache.druid.server.coordinator.rules.ExactProjectionPartialLoadMatcher; @@ -677,7 +678,7 @@ public void testIsHandOffComplete() EasyMock.replay(databaseRuleManager); String interval1 = "2013-01-01T01:00:00Z/2013-01-01T02:00:00Z"; - Response response1 = dataSourcesResource.isHandOffComplete(TestDataSource.WIKI, interval1, 1, "v1"); + Response response1 = dataSourcesResource.isHandOffComplete(TestDataSource.WIKI, interval1, 1, "v1", null); Assert.assertTrue((boolean) response1.getEntity()); EasyMock.verify(databaseRuleManager); @@ -693,7 +694,7 @@ public void testIsHandOffComplete() EasyMock.replay(inventoryView, databaseRuleManager); String interval2 = "2013-01-02T01:00:00Z/2013-01-02T02:00:00Z"; - Response response2 = dataSourcesResource.isHandOffComplete(TestDataSource.WIKI, interval2, 1, "v1"); + Response response2 = dataSourcesResource.isHandOffComplete(TestDataSource.WIKI, interval2, 1, "v1", null); Assert.assertFalse((boolean) response2.getEntity()); EasyMock.verify(inventoryView, databaseRuleManager); @@ -724,12 +725,105 @@ public List> lookupWithIncompleteP .once(); EasyMock.replay(inventoryView, databaseRuleManager); - Response response3 = dataSourcesResource.isHandOffComplete(TestDataSource.WIKI, interval3, 1, "v1"); + Response response3 = dataSourcesResource.isHandOffComplete(TestDataSource.WIKI, interval3, 1, "v1", null); Assert.assertTrue((boolean) response3.getEntity()); EasyMock.verify(inventoryView, databaseRuleManager); } + @Test + public void testIsHandOffCompleteUsesStrictTierAwareSegmentLoadWhenRequested() + { + final MetadataRuleManager databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class); + final DruidCoordinator druidCoordinator = EasyMock.createMock(DruidCoordinator.class); + final CoordinatorDynamicConfig dynamicConfig = CoordinatorDynamicConfig.builder().build(); + final String interval = "2013-01-02T02:00:00Z/2013-01-02T03:00:00Z"; + final Rule loadRule = new IntervalLoadRule( + Intervals.of("2013-01-02T00:00:00Z/2013-01-03T00:00:00Z"), + ImmutableMap.of("hot", 1, "cold", 1), + null + ); + final SegmentLoadInfo hotOnlySegmentLoadInfo = new SegmentLoadInfo(createSegment(Intervals.of(interval), "v1", 1)); + hotOnlySegmentLoadInfo.addServer(createHistoricalServerMetadata("hotServer", "hot")); + final VersionedIntervalTimeline hotOnlyTimeline = + createTimeline(interval, hotOnlySegmentLoadInfo); + + final SegmentLoadInfo allTierSegmentLoadInfo = new SegmentLoadInfo(createSegment(Intervals.of(interval), "v1", 1)); + allTierSegmentLoadInfo.addServer(createHistoricalServerMetadata("hotServer", "hot")); + allTierSegmentLoadInfo.addServer(createHistoricalServerMetadata("coldServer", "cold")); + final VersionedIntervalTimeline allTierTimeline = + createTimeline(interval, allTierSegmentLoadInfo); + + final DataSourcesResource dataSourcesResource = + new DataSourcesResource( + inventoryView, + segmentsMetadataManager, + databaseRuleManager, + null, + null, + druidCoordinator, + auditManager + ); + + EasyMock.expect(databaseRuleManager.getRulesWithDefault(TestDataSource.WIKI)) + .andReturn(ImmutableList.of(loadRule)) + .times(4); + EasyMock.expect(druidCoordinator.getCurrentDynamicConfig()) + .andReturn(dynamicConfig) + .times(2); + EasyMock.expect(inventoryView.getTimeline(new TableDataSource(TestDataSource.WIKI))) + .andReturn(hotOnlyTimeline) + .once(); + EasyMock.expect(inventoryView.getTimeline(new TableDataSource(TestDataSource.WIKI))) + .andReturn(hotOnlyTimeline) + .once(); + EasyMock.expect(inventoryView.getTimeline(new TableDataSource(TestDataSource.WIKI))) + .andReturn(hotOnlyTimeline) + .once(); + EasyMock.expect(inventoryView.getTimeline(new TableDataSource(TestDataSource.WIKI))) + .andReturn(allTierTimeline) + .once(); + EasyMock.replay(inventoryView, databaseRuleManager, druidCoordinator); + + final Response defaultResponse = dataSourcesResource.isHandOffComplete( + TestDataSource.WIKI, + interval, + 1, + "v1", + null + ); + Assert.assertTrue((boolean) defaultResponse.getEntity()); + + final Response falseParamResponse = dataSourcesResource.isHandOffComplete( + TestDataSource.WIKI, + interval, + 1, + "v1", + "false" + ); + Assert.assertTrue((boolean) falseParamResponse.getEntity()); + + final Response hotOnlyResponse = dataSourcesResource.isHandOffComplete( + TestDataSource.WIKI, + interval, + 1, + "v1", + "true" + ); + Assert.assertFalse((boolean) hotOnlyResponse.getEntity()); + + final Response allTierResponse = dataSourcesResource.isHandOffComplete( + TestDataSource.WIKI, + interval, + 1, + "v1", + "true" + ); + Assert.assertTrue((boolean) allTierResponse.getEntity()); + + EasyMock.verify(inventoryView, databaseRuleManager, druidCoordinator); + } + @Test public void testIsHandOffCompleteSegmentNotInMetadataReturnsTrue() { @@ -766,7 +860,7 @@ public void testIsHandOffCompleteSegmentNotInMetadataReturnsTrue() EasyMock.replay(databaseRuleManager, segmentsMetadataManager); String interval = "2013-01-01T01:00:00Z/2013-01-01T02:00:00Z"; - Response response = dataSourcesResource.isHandOffComplete(TestDataSource.WIKI, interval, 1, "v1"); + Response response = dataSourcesResource.isHandOffComplete(TestDataSource.WIKI, interval, 1, "v1", null); Assert.assertTrue((boolean) response.getEntity()); EasyMock.verify(databaseRuleManager, segmentsMetadataManager); @@ -813,7 +907,7 @@ public void testIsHandOffCompleteForcesMetadataRefreshOnSnapshotMiss() .once(); EasyMock.replay(inventoryView, databaseRuleManager, segmentsMetadataManager); - Response response = dataSourcesResource.isHandOffComplete(TestDataSource.WIKI, interval, 1, "v1"); + Response response = dataSourcesResource.isHandOffComplete(TestDataSource.WIKI, interval, 1, "v1", null); Assert.assertFalse((boolean) response.getEntity()); EasyMock.verify(inventoryView, databaseRuleManager, segmentsMetadataManager); @@ -864,7 +958,7 @@ public void testIsHandOffCompleteWithPartialLoadRuleFallThrough() .once(); EasyMock.replay(databaseRuleManager, segmentsMetadataManager); - Response response = dataSourcesResource.isHandOffComplete(TestDataSource.WIKI, interval, 1, "v1"); + Response response = dataSourcesResource.isHandOffComplete(TestDataSource.WIKI, interval, 1, "v1", null); Assert.assertTrue((boolean) response.getEntity()); EasyMock.verify(databaseRuleManager, segmentsMetadataManager); @@ -916,7 +1010,7 @@ public void testIsHandOffCompleteWithPartialLoadRuleMatcherResolves() .once(); EasyMock.replay(inventoryView, databaseRuleManager, segmentsMetadataManager); - Response response = dataSourcesResource.isHandOffComplete(TestDataSource.WIKI, interval, 1, "v1"); + Response response = dataSourcesResource.isHandOffComplete(TestDataSource.WIKI, interval, 1, "v1", null); Assert.assertFalse((boolean) response.getEntity()); EasyMock.verify(inventoryView, databaseRuleManager, segmentsMetadataManager); @@ -1344,6 +1438,59 @@ public void testSegmentLoadChecksForAssignableServer() ); } + @Test + public void testSegmentLoadChecksForRequiredTiers() + { + final Interval interval = Intervals.of("2011-04-01/2011-04-02"); + final SegmentDescriptor descriptor = new SegmentDescriptor(interval, "v1", 2); + final Set requiredTiers = ImmutableSet.of("hot", "cold"); + + Assert.assertFalse( + DataSourcesResource.isSegmentLoadedOnRequiredTiers( + Collections.singletonList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 2), + Sets.newHashSet(createHistoricalServerMetadata("a", "hot")) + ) + ), + descriptor, + requiredTiers + ) + ); + + Assert.assertTrue( + DataSourcesResource.isSegmentLoadedOnRequiredTiers( + Collections.singletonList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 2), + Sets.newHashSet( + createHistoricalServerMetadata("a", "hot"), + createHistoricalServerMetadata("b", "cold") + ) + ) + ), + descriptor, + requiredTiers + ) + ); + + Assert.assertFalse( + DataSourcesResource.isSegmentLoadedOnRequiredTiers( + Collections.singletonList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 2), + Sets.newHashSet( + createHistoricalServerMetadata("a", "hot"), + createRealtimeServerMetadata("b", "cold") + ) + ) + ), + descriptor, + requiredTiers + ) + ); + } + @Test public void testSegmentLoadChecksForPartitionNumber() { @@ -1662,7 +1809,7 @@ public void testMarkSegmentsAsUnusedWithNonNullIntervalAndEmptySegmentIds() @Test public void testGetDatasourceLoadstatusForceMetadataRefreshNull() { - Response response = dataSourcesResource.getDatasourceLoadstatus(TestDataSource.WIKI, null, null, null, null, null); + Response response = dataSourcesResource.getDatasourceLoadstatus(TestDataSource.WIKI, null, null, null, null, null, null); Assert.assertEquals(400, response.getStatus()); } @@ -1675,7 +1822,7 @@ public void testGetDatasourceLoadstatusNoSegmentForInterval() ).andReturn(DataSourcesSnapshot.fromUsedSegments(List.of())).once(); EasyMock.replay(segmentsMetadataManager); - Response response = dataSourcesResource.getDatasourceLoadstatus(TestDataSource.WIKI, true, null, null, null, null); + Response response = dataSourcesResource.getDatasourceLoadstatus(TestDataSource.WIKI, true, null, null, null, null, null); Assert.assertEquals(204, response.getStatus()); } @@ -1733,7 +1880,7 @@ public void testGetDatasourceLoadstatusDefault() EasyMock.expect(inventoryView.getLoadInfoForAllSegments()).andReturn(completedLoadInfoMap).once(); EasyMock.replay(segmentsMetadataManager, inventoryView); - Response response = dataSourcesResource.getDatasourceLoadstatus(TestDataSource.WIKI, true, null, null, null, null); + Response response = dataSourcesResource.getDatasourceLoadstatus(TestDataSource.WIKI, true, null, null, null, null, null); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); Assert.assertEquals(1, ((Map) response.getEntity()).size()); @@ -1748,7 +1895,7 @@ public void testGetDatasourceLoadstatusDefault() EasyMock.expect(inventoryView.getLoadInfoForAllSegments()).andReturn(halfLoadedInfoMap).once(); EasyMock.replay(segmentsMetadataManager, inventoryView); - response = dataSourcesResource.getDatasourceLoadstatus(TestDataSource.WIKI, true, null, null, null, null); + response = dataSourcesResource.getDatasourceLoadstatus(TestDataSource.WIKI, true, null, null, null, null, null); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); Assert.assertEquals(1, ((Map) response.getEntity()).size()); @@ -1757,6 +1904,52 @@ public void testGetDatasourceLoadstatusDefault() EasyMock.verify(segmentsMetadataManager, inventoryView); } + @Test + public void testGetDatasourceLoadstatusDefaultUsesStrictTierAwareSegmentLoadWhenRequested() + { + final DateTime now = DateTimes.nowUtc(); + final DataSegment loadedOnAllTiers = new DataSegment( + TestDataSource.WIKI, + new Interval(now.minusDays(5), Period.days(1)), + "", + null, + null, + null, + null, + 0x9, + 10 + ); + final DataSegment missingTier = new DataSegment( + TestDataSource.WIKI, + new Interval(now.minusDays(4), Period.days(1)), + "", + null, + null, + null, + null, + 0x9, + 20 + ); + final List segments = ImmutableList.of(loadedOnAllTiers, missingTier); + final DruidCoordinator druidCoordinator = EasyMock.createMock(DruidCoordinator.class); + final DataSourcesResource dataSourcesResource = + new DataSourcesResource(null, segmentsMetadataManager, null, null, null, druidCoordinator, auditManager); + + EasyMock.expect(segmentsMetadataManager.forceUpdateDataSourcesSnapshot()) + .andReturn(DataSourcesSnapshot.fromUsedSegments(segments)) + .once(); + EasyMock.expect(druidCoordinator.isSegmentAvailable(loadedOnAllTiers, true)).andReturn(true).once(); + EasyMock.expect(druidCoordinator.isSegmentAvailable(missingTier, true)).andReturn(false).once(); + EasyMock.replay(segmentsMetadataManager, druidCoordinator); + + final Response response = + dataSourcesResource.getDatasourceLoadstatus(TestDataSource.WIKI, true, null, null, null, null, "true"); + Assert.assertEquals(200, response.getStatus()); + Assert.assertNotNull(response.getEntity()); + Assert.assertEquals(50.0, ((Map) response.getEntity()).get(TestDataSource.WIKI)); + EasyMock.verify(segmentsMetadataManager, druidCoordinator); + } + @Test public void testGetDatasourceLoadstatusSimple() { @@ -1811,7 +2004,7 @@ public void testGetDatasourceLoadstatusSimple() EasyMock.expect(inventoryView.getLoadInfoForAllSegments()).andReturn(completedLoadInfoMap).once(); EasyMock.replay(segmentsMetadataManager, inventoryView); - Response response = dataSourcesResource.getDatasourceLoadstatus(TestDataSource.WIKI, true, null, "simple", null, null); + Response response = dataSourcesResource.getDatasourceLoadstatus(TestDataSource.WIKI, true, null, "simple", null, null, null); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); Assert.assertEquals(1, ((Map) response.getEntity()).size()); @@ -1826,7 +2019,7 @@ public void testGetDatasourceLoadstatusSimple() EasyMock.expect(inventoryView.getLoadInfoForAllSegments()).andReturn(halfLoadedInfoMap).once(); EasyMock.replay(segmentsMetadataManager, inventoryView); - response = dataSourcesResource.getDatasourceLoadstatus(TestDataSource.WIKI, true, null, "simple", null, null); + response = dataSourcesResource.getDatasourceLoadstatus(TestDataSource.WIKI, true, null, "simple", null, null, null); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); Assert.assertEquals(1, ((Map) response.getEntity()).size()); @@ -1883,7 +2076,7 @@ public void testGetDatasourceLoadstatusFull() DataSourcesResource dataSourcesResource = new DataSourcesResource(null, segmentsMetadataManager, null, null, null, druidCoordinator, auditManager); - Response response = dataSourcesResource.getDatasourceLoadstatus(TestDataSource.WIKI, true, null, null, "full", null); + Response response = dataSourcesResource.getDatasourceLoadstatus(TestDataSource.WIKI, true, null, null, "full", null, null); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); Assert.assertEquals(2, ((Map) response.getEntity()).size()); @@ -1942,7 +2135,15 @@ public void testGetDatasourceLoadstatusFullAndComputeUsingClusterView() DataSourcesResource dataSourcesResource = new DataSourcesResource(null, segmentsMetadataManager, null, null, null, druidCoordinator, auditManager); - Response response = dataSourcesResource.getDatasourceLoadstatus(TestDataSource.WIKI, true, null, null, "full", "computeUsingClusterView"); + Response response = dataSourcesResource.getDatasourceLoadstatus( + TestDataSource.WIKI, + true, + null, + null, + "full", + "computeUsingClusterView", + null + ); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); Assert.assertEquals(2, ((Map) response.getEntity()).size()); @@ -1955,17 +2156,32 @@ public void testGetDatasourceLoadstatusFullAndComputeUsingClusterView() private DruidServerMetadata createRealtimeServerMetadata(String name) { - return createServerMetadata(name, ServerType.REALTIME); + return createRealtimeServerMetadata(name, "tier"); + } + + private DruidServerMetadata createRealtimeServerMetadata(String name, String tier) + { + return createServerMetadata(name, ServerType.REALTIME, tier); } private DruidServerMetadata createHistoricalServerMetadata(String name) { - return createServerMetadata(name, ServerType.HISTORICAL); + return createHistoricalServerMetadata(name, "tier"); + } + + private DruidServerMetadata createHistoricalServerMetadata(String name, String tier) + { + return createServerMetadata(name, ServerType.HISTORICAL, tier); } private DruidServerMetadata createServerMetadata(String name, ServerType type) { - return new DruidServerMetadata(name, name, null, 10000, null, type, "tier", 1); + return createServerMetadata(name, type, "tier"); + } + + private DruidServerMetadata createServerMetadata(String name, ServerType type, String tier) + { + return new DruidServerMetadata(name, name, null, 10000, null, type, tier, 1); } private DataSegment createSegment(Interval interval, String version, int partitionNumber) @@ -1982,6 +2198,25 @@ private DataSegment createSegment(Interval interval, String version, int partiti ); } + private VersionedIntervalTimeline createTimeline( + String interval, + SegmentLoadInfo segmentLoadInfo + ) + { + return new VersionedIntervalTimeline<>(null) + { + @Override + public List> lookupWithIncompletePartitions(Interval lookupInterval) + { + final PartitionHolder partitionHolder = + new PartitionHolder<>(new NumberedPartitionChunk<>(1, 1, segmentLoadInfo)); + final List> ret = new ArrayList<>(); + ret.add(new TimelineObjectHolder<>(Intervals.of(interval), "v1", partitionHolder)); + return ret; + } + }; + } + private void prepareRequestForAudit() { EasyMock.expect(request.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn("author").anyTimes(); diff --git a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java index 6be8754dd128..c4e5414944c9 100644 --- a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java +++ b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java @@ -128,6 +128,7 @@ public void testTaskValidator() throws Exception null, null, null, + null, 2 ) ),