diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java index fa184418df52..ad3a3f3fff14 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java @@ -87,8 +87,9 @@ public void test_boundedSupervisor_ingestsDataAndCompletes() cluster.callApi().postSupervisor(supervisor); - // Wait for records to be ingested - waitUntilPublishedRecordsAreIngested(totalRecords); + // Bounded supervisor cold start (post supervisor -> schedule task -> consume -> publish) can exceed + // the cluster default wait on CI; give it a generous ceiling. + waitUntilPublishedRecordsAreIngested(totalRecords, 120_000L); // Wait for supervisor to transition to COMPLETED state waitForSupervisorToComplete(supervisor.getId()); diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java index ee826936b2b4..9f6d54eab346 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java @@ -131,15 +131,34 @@ protected KinesisSupervisorSpec createKinesisSupervisor(KinesisResource kinesis, /** * Waits until the total row count of successfully published segments matches - * {@code expectedRowCount}. + * {@code expectedRowCount}, using the cluster default emitter timeout. */ protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount) { - indexer.latchableEmitter().waitForEventAggregate( - event -> event.hasMetricName("ingest/rows/published") - .hasDimension(DruidMetrics.DATASOURCE, dataSource), - agg -> agg.hasSumAtLeast(expectedRowCount) - ); + waitUntilPublishedRecordsAreIngested(expectedRowCount, null); + } + + /** + * Same as {@link #waitUntilPublishedRecordsAreIngested(int)} but with an explicit timeout in millis. + * Use for ingestion paths with a heavier task lifecycle (e.g. bounded supervisor cold start) where the + * cluster default may not allow enough headroom on CI. + */ + protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount, Long timeoutMillis) + { + if (timeoutMillis == null) { + indexer.latchableEmitter().waitForEventAggregate( + event -> event.hasMetricName("ingest/rows/published") + .hasDimension(DruidMetrics.DATASOURCE, dataSource), + agg -> agg.hasSumAtLeast(expectedRowCount) + ); + } else { + indexer.latchableEmitter().waitForEventAggregate( + event -> event.hasMetricName("ingest/rows/published") + .hasDimension(DruidMetrics.DATASOURCE, dataSource), + agg -> agg.hasSumAtLeast(expectedRowCount), + timeoutMillis + ); + } final int totalEventsProcessed = indexer .latchableEmitter() diff --git a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java index e9779c1ddd68..fc4b40af6d66 100644 --- a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java +++ b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java @@ -202,6 +202,18 @@ public void waitForEventAggregate( UnaryOperator condition, UnaryOperator aggregateCondition ) + { + waitForEventAggregate(condition, aggregateCondition, defaultWaitTimeoutMillis); + } + + /** + * Same as {@link #waitForEventAggregate(UnaryOperator, UnaryOperator)} but with an explicit timeout. + */ + public void waitForEventAggregate( + UnaryOperator condition, + UnaryOperator aggregateCondition, + long timeoutMillis + ) { final EventMatcher eventMatcher = condition.apply(new EventMatcher()); final AggregateMatcher aggregateMatcher = aggregateCondition.apply(new AggregateMatcher()); @@ -210,7 +222,7 @@ public void waitForEventAggregate( event -> event instanceof ServiceMetricEvent && eventMatcher.test((ServiceMetricEvent) event) && aggregateMatcher.test((ServiceMetricEvent) event), - defaultWaitTimeoutMillis + timeoutMillis ); }