Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ public void test_boundedSupervisor_ingestsDataAndCompletes()

cluster.callApi().postSupervisor(supervisor);

// Wait for records to be ingested
waitUntilPublishedRecordsAreIngested(totalRecords);
// Bounded supervisor cold start (post supervisor -> schedule task -> consume -> publish) can exceed
// the cluster default wait on CI; give it a generous ceiling.
waitUntilPublishedRecordsAreIngested(totalRecords, 120_000L);

// Wait for supervisor to transition to COMPLETED state
waitForSupervisorToComplete(supervisor.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,34 @@ protected KinesisSupervisorSpec createKinesisSupervisor(KinesisResource kinesis,

/**
* Waits until the total row count of successfully published segments matches
* {@code expectedRowCount}.
* {@code expectedRowCount}, using the cluster default emitter timeout.
*/
protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount)
{
indexer.latchableEmitter().waitForEventAggregate(
event -> event.hasMetricName("ingest/rows/published")
.hasDimension(DruidMetrics.DATASOURCE, dataSource),
agg -> agg.hasSumAtLeast(expectedRowCount)
);
waitUntilPublishedRecordsAreIngested(expectedRowCount, null);
}

/**
* Same as {@link #waitUntilPublishedRecordsAreIngested(int)} but with an explicit timeout in millis.
* Use for ingestion paths with a heavier task lifecycle (e.g. bounded supervisor cold start) where the
* cluster default may not allow enough headroom on CI.
*/
protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount, Long timeoutMillis)
{
if (timeoutMillis == null) {
Comment thread
Shekharrajak marked this conversation as resolved.
indexer.latchableEmitter().waitForEventAggregate(
event -> event.hasMetricName("ingest/rows/published")
.hasDimension(DruidMetrics.DATASOURCE, dataSource),
agg -> agg.hasSumAtLeast(expectedRowCount)
);
} else {
indexer.latchableEmitter().waitForEventAggregate(
event -> event.hasMetricName("ingest/rows/published")
.hasDimension(DruidMetrics.DATASOURCE, dataSource),
agg -> agg.hasSumAtLeast(expectedRowCount),
timeoutMillis
);
}

final int totalEventsProcessed = indexer
.latchableEmitter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,18 @@ public void waitForEventAggregate(
UnaryOperator<EventMatcher> condition,
UnaryOperator<AggregateMatcher> aggregateCondition
)
{
waitForEventAggregate(condition, aggregateCondition, defaultWaitTimeoutMillis);
}

/**
* Same as {@link #waitForEventAggregate(UnaryOperator, UnaryOperator)} but with an explicit timeout.
*/
public void waitForEventAggregate(
UnaryOperator<EventMatcher> condition,
UnaryOperator<AggregateMatcher> aggregateCondition,
long timeoutMillis
)
{
final EventMatcher eventMatcher = condition.apply(new EventMatcher());
final AggregateMatcher aggregateMatcher = aggregateCondition.apply(new AggregateMatcher());
Expand All @@ -210,7 +222,7 @@ public void waitForEventAggregate(
event -> event instanceof ServiceMetricEvent
&& eventMatcher.test((ServiceMetricEvent) event)
&& aggregateMatcher.test((ServiceMetricEvent) event),
defaultWaitTimeoutMillis
timeoutMillis
);
}

Expand Down
Loading