diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchIndexProgressState.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchIndexProgressState.java index 77be7a09dd..1433451774 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchIndexProgressState.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchIndexProgressState.java @@ -6,11 +6,15 @@ package org.opensearch.dataprepper.plugins.source.opensearch; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchShardStatistics; import java.time.Instant; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Objects; public class OpenSearchIndexProgressState { @@ -27,6 +31,14 @@ public class OpenSearchIndexProgressState { @JsonInclude(JsonInclude.Include.NON_NULL) private List searchAfter; + @JsonProperty("had_search_failures") + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + private boolean hadSearchFailures; + + @JsonProperty("failure_reason_counts") + @JsonInclude(JsonInclude.Include.NON_EMPTY) + private Map failureReasonCounts = new LinkedHashMap<>(); + public OpenSearchIndexProgressState() { } @@ -35,11 +47,15 @@ public OpenSearchIndexProgressState() { public OpenSearchIndexProgressState(@JsonProperty("pit_id") final String pitId, @JsonProperty("pit_creation_time") final Long pitCreationTime, @JsonProperty("pit_keep_alive") final Long pitKeepAlive, - @JsonProperty("pit_search_after") final List searchAfter) { + @JsonProperty("pit_search_after") final List searchAfter, + @JsonProperty("had_search_failures") final boolean hadSearchFailures, + @JsonProperty("failure_reason_counts") final Map failureReasonCounts) { this.pitId = pitId; this.pitCreationTime = pitCreationTime; this.keepAlive = pitKeepAlive; this.searchAfter = searchAfter; + this.hadSearchFailures = hadSearchFailures; + this.failureReasonCounts = failureReasonCounts == null ? new LinkedHashMap<>() : new LinkedHashMap<>(failureReasonCounts); } public List getSearchAfter() { @@ -74,6 +90,49 @@ public void setKeepAlive(final Long keepAlive) { this.keepAlive = keepAlive; } + @JsonIgnore + private transient ShardFailureAggregator failureAggregator; + + private ShardFailureAggregator getOrCreateAggregator() { + if (failureAggregator == null) { + failureAggregator = new ShardFailureAggregator(hadSearchFailures, failureReasonCounts); + } + return failureAggregator; + } + + private void syncFromAggregator() { + hadSearchFailures = failureAggregator.hadFailures(); + failureReasonCounts = new LinkedHashMap<>(failureAggregator.getFailureReasonCounts()); + } + + public boolean isHadSearchFailures() { + return getOrCreateAggregator().hadFailures(); + } + + public void setHadSearchFailures(final boolean hadSearchFailures) { + this.hadSearchFailures = hadSearchFailures; + this.failureAggregator = null; + } + + public Map getFailureReasonCounts() { + return getOrCreateAggregator().getFailureReasonCounts(); + } + + public void setFailureReasonCounts(final Map failureReasonCounts) { + this.failureReasonCounts = failureReasonCounts == null ? new LinkedHashMap<>() : new LinkedHashMap<>(failureReasonCounts); + this.failureAggregator = null; + } + + public void recordShardFailures(final SearchShardStatistics shardStatistics) { + getOrCreateAggregator().recordShardFailures(shardStatistics); + syncFromAggregator(); + } + + public void recordRequestFailure(final Throwable throwable) { + getOrCreateAggregator().recordRequestFailure(throwable); + syncFromAggregator(); + } + public boolean hasValidPointInTime() { return Objects.nonNull(pitId) && Objects.nonNull(pitCreationTime) && Objects.nonNull(keepAlive) && Instant.ofEpochMilli(pitCreationTime + keepAlive).isAfter(Instant.now()); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/ShardFailureAggregator.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/ShardFailureAggregator.java new file mode 100644 index 0000000000..6487e3377c --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/ShardFailureAggregator.java @@ -0,0 +1,87 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.source.opensearch; + +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchShardStatistics; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +class ShardFailureAggregator { + + static final int MAX_DISTINCT_REASONS = 20; + static final String OVERFLOW_REASON_KEY = "__other__"; + + private boolean hadFailures; + private final Map failureReasonCounts; + + ShardFailureAggregator() { + this.hadFailures = false; + this.failureReasonCounts = new LinkedHashMap<>(); + } + + ShardFailureAggregator(final boolean hadFailures, final Map counts) { + this.hadFailures = hadFailures; + this.failureReasonCounts = counts == null ? new LinkedHashMap<>() : new LinkedHashMap<>(counts); + } + + void recordShardFailures(final SearchShardStatistics pageStats) { + if (pageStats == null || !pageStats.hasFailures()) { + return; + } + this.hadFailures = true; + mergeFailureReasonCounts(pageStats.getFailureReasonCounts()); + } + + void recordRequestFailure(final Throwable throwable) { + this.hadFailures = true; + final String key; + if (throwable == null) { + key = "unknown"; + } else { + final String message = throwable.getMessage() == null ? "" : ": " + throwable.getMessage(); + key = SearchShardStatistics.normalizeReason(throwable.getClass().getSimpleName() + message); + } + increment(key, 1L); + } + + boolean hadFailures() { + return hadFailures; + } + + Map getFailureReasonCounts() { + return Collections.unmodifiableMap(failureReasonCounts); + } + + private void mergeFailureReasonCounts(final Map toMerge) { + if (toMerge == null || toMerge.isEmpty()) { + return; + } + for (final Map.Entry entry : toMerge.entrySet()) { + increment(entry.getKey(), entry.getValue() == null ? 0L : entry.getValue()); + } + } + + private void increment(final String key, final long delta) { + if (key == null || delta <= 0) { + return; + } + if (failureReasonCounts.containsKey(key)) { + failureReasonCounts.merge(key, delta, Long::sum); + return; + } + if (failureReasonCounts.size() < MAX_DISTINCT_REASONS) { + failureReasonCounts.put(key, delta); + return; + } + failureReasonCounts.merge(OVERFLOW_REASON_KEY, delta, Long::sum); + } +} diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetrics.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetrics.java index 3f1c7792c1..07fbce28bf 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetrics.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetrics.java @@ -20,6 +20,9 @@ public class OpenSearchSourcePluginMetrics { static final String BYTES_PROCESSED = "bytesProcessed"; static final String CREDENTIALS_CHANGED = "credentialsChanged"; static final String CLIENT_REFRESH_ERRORS = "clientRefreshErrors"; + static final String SEARCH_REQUESTS_FAILED = "searchRequestsFailed"; + static final String SEARCH_SHARDS_FAILED = "searchShardsFailed"; + static final String INDICES_COMPLETED_WITH_FAILURES = "indicesCompletedWithFailures"; private final Counter documentsProcessedCounter; private final Counter indicesProcessedCounter; @@ -30,6 +33,9 @@ public class OpenSearchSourcePluginMetrics { private final DistributionSummary bytesProcessedSummary; private final Counter credentialsChangeCounter; private final Counter clientRefreshErrorsCounter; + private final Counter searchRequestsFailedCounter; + private final Counter searchShardsFailedCounter; + private final Counter indicesCompletedWithFailuresCounter; public static OpenSearchSourcePluginMetrics create(final PluginMetrics pluginMetrics) { return new OpenSearchSourcePluginMetrics(pluginMetrics); @@ -44,6 +50,9 @@ private OpenSearchSourcePluginMetrics(final PluginMetrics pluginMetrics) { bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED); credentialsChangeCounter = pluginMetrics.counter(CREDENTIALS_CHANGED); clientRefreshErrorsCounter = pluginMetrics.counter(CLIENT_REFRESH_ERRORS); + searchRequestsFailedCounter = pluginMetrics.counter(SEARCH_REQUESTS_FAILED); + searchShardsFailedCounter = pluginMetrics.counter(SEARCH_SHARDS_FAILED); + indicesCompletedWithFailuresCounter = pluginMetrics.counter(INDICES_COMPLETED_WITH_FAILURES); } public Counter getDocumentsProcessedCounter() { @@ -77,4 +86,16 @@ public Counter getCredentialsChangeCounter() { public Counter getClientRefreshErrorsCounter() { return clientRefreshErrorsCounter; } + + public Counter getSearchRequestsFailedCounter() { + return searchRequestsFailedCounter; + } + + public Counter getSearchShardsFailedCounter() { + return searchShardsFailedCounter; + } + + public Counter getIndicesCompletedWithFailuresCounter() { + return indicesCompletedWithFailuresCounter; + } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java index 090293691f..a63e0f43c1 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java @@ -37,6 +37,8 @@ import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.calculateExponentialBackoffAndJitter; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.completeIndexPartition; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.createAcknowledgmentSet; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.hasMorePages; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.recordShardFailuresIfAny; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_ID_METADATA_ATTRIBUTE_NAME; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.INDEX_METADATA_ATTRIBUTE_NAME; @@ -103,7 +105,7 @@ public void run() { openSearchSourcePluginMetrics.getIndexProcessingTimeTimer().record(() -> processIndex(indexPartition.get(), acknowledgementSet)); completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet, - indexPartition.get(), sourceCoordinator); + indexPartition.get(), sourceCoordinator, openSearchSourcePluginMetrics); openSearchSourcePluginMetrics.getIndicesProcessedCounter().increment(); LOG.info("Completed processing for index: '{}'", indexPartition.get().getPartitionKey()); @@ -178,15 +180,19 @@ private void processIndex(final SourcePartition op openSearchIndexProgressState.setSearchAfter(searchWithSearchAfterResults.getNextSearchAfter()); + recordShardFailuresIfAny(indexName, searchWithSearchAfterResults.getShardStatistics(), openSearchIndexProgressState, openSearchSourcePluginMetrics); + if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) { LOG.debug("Renew ownership of index {}", indexName); sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState); lastCheckpointTime = System.currentTimeMillis(); } - } while (searchWithSearchAfterResults.getDocuments().size() == searchConfiguration.getBatchSize()); + } while (hasMorePages(searchWithSearchAfterResults)); - LOG.info("Received {} documents in latest search request, and batch size is {}, exiting pagination", - searchWithSearchAfterResults.getDocuments().size(), searchConfiguration.getBatchSize()); + LOG.info("Reached end of index '{}' (last page returned {} documents, nextSearchAfter present: {}).", + indexName, + searchWithSearchAfterResults.getDocuments().size(), + searchWithSearchAfterResults.getNextSearchAfter() != null); try { bufferAccumulator.flush(); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java index f30694487f..8e8c1b6551 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java @@ -41,6 +41,8 @@ import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.calculateExponentialBackoffAndJitter; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.completeIndexPartition; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.createAcknowledgmentSet; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.hasMorePages; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.recordShardFailuresIfAny; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_ID_METADATA_ATTRIBUTE_NAME; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.INDEX_METADATA_ATTRIBUTE_NAME; @@ -116,7 +118,7 @@ public void run() { openSearchSourcePluginMetrics.getIndexProcessingTimeTimer().record(() -> processIndex(indexPartition.get(), acknowledgementSet)); completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet, - indexPartition.get(), sourceCoordinator); + indexPartition.get(), sourceCoordinator, openSearchSourcePluginMetrics); openSearchSourcePluginMetrics.getIndicesProcessedCounter().increment(); } catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) { @@ -216,15 +218,19 @@ private void processIndex(final SourcePartition op openSearchIndexProgressState.setSearchAfter(searchWithSearchAfterResults.getNextSearchAfter()); openSearchIndexProgressState.setKeepAlive(Duration.ofMillis(openSearchIndexProgressState.getKeepAlive()).plus(EXTEND_KEEP_ALIVE_DURATION).toMillis()); + recordShardFailuresIfAny(indexName, searchWithSearchAfterResults.getShardStatistics(), openSearchIndexProgressState, openSearchSourcePluginMetrics); + if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) { LOG.debug("Renew ownership of index {}", indexName); sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState); lastCheckpointTime = System.currentTimeMillis(); } - } while (searchWithSearchAfterResults.getDocuments().size() == searchConfiguration.getBatchSize()); + } while (hasMorePages(searchWithSearchAfterResults)); - LOG.info("Received {} documents in latest search request, and batch size is {}, exiting pagination", - searchWithSearchAfterResults.getDocuments().size(), searchConfiguration.getBatchSize()); + LOG.info("Reached end of index '{}' (last page returned {} documents, nextSearchAfter present: {}).", + indexName, + searchWithSearchAfterResults.getDocuments().size(), + searchWithSearchAfterResults.getNextSearchAfter() != null); try { bufferAccumulator.flush(); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java index b303013f09..df8e81605f 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java @@ -40,6 +40,7 @@ import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.calculateExponentialBackoffAndJitter; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.completeIndexPartition; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.createAcknowledgmentSet; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.recordShardFailuresIfAny; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_ID_METADATA_ATTRIBUTE_NAME; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.INDEX_METADATA_ATTRIBUTE_NAME; @@ -51,6 +52,7 @@ public class ScrollWorker implements SearchWorker { private static final Logger LOG = LoggerFactory.getLogger(ScrollWorker.class); private static final Duration BACKOFF_ON_SCROLL_LIMIT_REACHED = Duration.ofSeconds(120); static final String SCROLL_TIME_PER_BATCH = "10m"; + static final int MAX_CONSECUTIVE_SCROLL_FAILURES = 3; private final ObjectMapper objectMapper; private final SearchAccessor searchAccessor; @@ -111,7 +113,7 @@ public void run() { openSearchSourcePluginMetrics.getIndexProcessingTimeTimer().record(() -> processIndex(indexPartition.get(), acknowledgementSet)); completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet, - indexPartition.get(), sourceCoordinator); + indexPartition.get(), sourceCoordinator, openSearchSourcePluginMetrics); openSearchSourcePluginMetrics.getIndicesProcessedCounter().increment(); } catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) { @@ -155,7 +157,10 @@ private void processIndex(final SourcePartition op LOG.info("Started processing for index: '{}'", indexName); - final Integer batchSize = openSearchSourceConfiguration.getSearchConfiguration().getBatchSize(); + final OpenSearchIndexProgressState openSearchIndexProgressState = openSearchIndexPartition + .getPartitionState() + .orElseGet(OpenSearchIndexProgressState::new); + final List sortingOptions = SortingOptions.fromSortConfigs(openSearchSourceConfiguration.getSearchConfiguration().getSort()); final CreateScrollResponse createScrollResponse = searchAccessor.createScroll(CreateScrollRequest.builder() @@ -166,36 +171,54 @@ private void processIndex(final SourcePartition op .build()); writeDocumentsToBuffer(createScrollResponse.getDocuments(), acknowledgementSet); + recordShardFailuresIfAny(indexName, createScrollResponse.getShardStatistics(), openSearchIndexProgressState, openSearchSourcePluginMetrics); SearchScrollResponse searchScrollResponse = null; + int consecutiveFailures = 0; - if (createScrollResponse.getDocuments().size() == batchSize) { - do { - try { - searchScrollResponse = searchAccessor.searchWithScroll(SearchScrollRequest.builder() - .withScrollId(Objects.nonNull(searchScrollResponse) && Objects.nonNull(searchScrollResponse.getScrollId()) ? searchScrollResponse.getScrollId() : createScrollResponse.getScrollId()) - .withScrollTime(SCROLL_TIME_PER_BATCH) - .build()); + while (shouldKeepScrolling(searchScrollResponse, createScrollResponse)) { + try { + searchScrollResponse = searchAccessor.searchWithScroll(SearchScrollRequest.builder() + .withScrollId(currentScrollId(searchScrollResponse, createScrollResponse)) + .withScrollTime(SCROLL_TIME_PER_BATCH) + .build()); - writeDocumentsToBuffer(searchScrollResponse.getDocuments(), acknowledgementSet); + consecutiveFailures = 0; - if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) { - LOG.debug("Renew ownership of index {}", indexName); - sourceCoordinator.saveProgressStateForPartition(indexName, null); - lastCheckpointTime = System.currentTimeMillis(); - } - } catch (final Exception e) { + writeDocumentsToBuffer(searchScrollResponse.getDocuments(), acknowledgementSet); + recordShardFailuresIfAny(indexName, searchScrollResponse.getShardStatistics(), openSearchIndexProgressState, openSearchSourcePluginMetrics); + + if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) { + LOG.debug("Renew ownership of index {}", indexName); + sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState); + lastCheckpointTime = System.currentTimeMillis(); + } + } catch (final SearchContextLimitException | IndexNotFoundException e) { + deleteScroll(createScrollResponse.getScrollId()); + throw e; + } catch (final RuntimeException e) { + consecutiveFailures++; + openSearchSourcePluginMetrics.getSearchRequestsFailedCounter().increment(); + openSearchIndexProgressState.recordRequestFailure(e); + LOG.warn("Scroll page failed for index '{}' ({}/{}). Some documents may have been skipped. " + + "Continuing pagination with the next scroll page.", + indexName, consecutiveFailures, MAX_CONSECUTIVE_SCROLL_FAILURES, e); + if (consecutiveFailures >= MAX_CONSECUTIVE_SCROLL_FAILURES) { deleteScroll(createScrollResponse.getScrollId()); throw e; } - } while (searchScrollResponse.getDocuments().size() == batchSize); + } + } - LOG.info("Received {} documents in latest search request, and batch size is {}, exiting pagination", - searchScrollResponse.getDocuments().size(), batchSize); + if (searchScrollResponse != null) { + LOG.info("Reached end of scroll for index '{}' after last page returned {} documents.", + indexName, searchScrollResponse.getDocuments().size()); } deleteScroll(createScrollResponse.getScrollId()); + sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState); + try { bufferAccumulator.flush(); } catch (final Exception e) { @@ -204,6 +227,22 @@ private void processIndex(final SourcePartition op } } + private static boolean shouldKeepScrolling(final SearchScrollResponse latest, + final CreateScrollResponse created) { + if (latest == null) { + return !created.getDocuments().isEmpty(); + } + return !latest.getDocuments().isEmpty(); + } + + private static String currentScrollId(final SearchScrollResponse latest, + final CreateScrollResponse created) { + if (latest != null && latest.getScrollId() != null) { + return latest.getScrollId(); + } + return created.getScrollId(); + } + private void writeDocumentsToBuffer(final List documents, final AcknowledgementSet acknowledgementSet) { documents.stream().map(Record::new).forEach(record -> { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java index 1adebeb749..8e77e94dff 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java @@ -11,6 +11,9 @@ import org.opensearch.dataprepper.model.source.coordinator.SourcePartition; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchShardStatistics; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,9 +71,12 @@ static AcknowledgementSet createAcknowledgmentSet(final AcknowledgementSetManage } static void completeIndexPartition(final OpenSearchSourceConfiguration openSearchSourceConfiguration, - final AcknowledgementSet acknowledgementSet, - final SourcePartition indexPartition, - final SourceCoordinator sourceCoordinator) { + final AcknowledgementSet acknowledgementSet, + final SourcePartition indexPartition, + final SourceCoordinator sourceCoordinator, + final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) { + emitFailureSummaryIfAny(indexPartition, openSearchSourcePluginMetrics); + if (openSearchSourceConfiguration.isAcknowledgmentsEnabled()) { sourceCoordinator.updatePartitionForAcknowledgmentWait(indexPartition.getPartitionKey(), OWNERSHIP_TIMEOUT); acknowledgementSet.complete(); @@ -87,8 +93,67 @@ static void completeIndexPartition(final OpenSearchSourceConfiguration openSearc } } + private static void emitFailureSummaryIfAny(final SourcePartition indexPartition, + final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) { + if (openSearchSourcePluginMetrics == null) { + return; + } + final OpenSearchIndexProgressState progressState = indexPartition.getPartitionState().orElse(null); + if (progressState == null || !progressState.isHadSearchFailures()) { + return; + } + LOG.warn("Index '{}' completed with search failures. Aggregated reasons: {}. Some documents may not have been read.", + indexPartition.getPartitionKey(), + progressState.getFailureReasonCounts()); + openSearchSourcePluginMetrics.getIndicesCompletedWithFailuresCounter().increment(); + } + static long calculateExponentialBackoffAndJitter(final int retryCount) { final long jitterMillis = MIN_JITTER.toMillis() + RANDOM.nextInt((int) (MAX_JITTER.toMillis() - MIN_JITTER.toMillis() + 1)); return max(1, min(STARTING_BACKOFF.toMillis() * pow(BACKOFF_RATE, (int) min(retryCount - 1, 10)) + jitterMillis, MAX_BACKOFF.toMillis())); } + + /** + * Returns true when the search_after / PIT pagination loop should make another + * request. We stop when the cluster signals it has no more results (null + * nextSearchAfter) or when the last page returned zero documents. Short pages + * are never treated as end-of-index on their own: a short page can happen + * because of shard failures and we want to keep paging past those. + */ + static boolean hasMorePages(final SearchWithSearchAfterResults results) { + if (results == null) { + return false; + } + if (results.getNextSearchAfter() == null) { + return false; + } + return results.getDocuments() != null && !results.getDocuments().isEmpty(); + } + + /** + * Record any shard-level failures observed on a single page: emit a warning + * log with the aggregated reason counts, increment the shard-failure counter, + * and merge the stats into the persisted progress state so the completion + * summary later has a full picture. + */ + static void recordShardFailuresIfAny(final String indexName, + final SearchShardStatistics shardStatistics, + final OpenSearchIndexProgressState progressState, + final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) { + if (shardStatistics == null || !shardStatistics.hasFailures()) { + return; + } + LOG.warn("OpenSearch source observed {} failed shards out of {} on index '{}'. Reasons: {}. " + + "Some documents may be missing; continuing pagination.", + shardStatistics.getFailed(), + shardStatistics.getTotal(), + indexName, + shardStatistics.getFailureReasonCounts()); + if (openSearchSourcePluginMetrics != null && shardStatistics.getFailed() > 0) { + openSearchSourcePluginMetrics.getSearchShardsFailedCounter().increment(shardStatistics.getFailed()); + } + if (progressState != null) { + progressState.recordShardFailures(shardStatistics); + } + } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java index 939359d953..c8d02dd737 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java @@ -6,8 +6,10 @@ import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch._types.ElasticsearchException; +import co.elastic.clients.elasticsearch._types.ErrorCause; import co.elastic.clients.elasticsearch._types.FieldSort; import co.elastic.clients.elasticsearch._types.ScoreSort; +import co.elastic.clients.elasticsearch._types.ShardStatistics; import co.elastic.clients.elasticsearch._types.SortOptions; import co.elastic.clients.elasticsearch._types.SortOrder; import co.elastic.clients.elasticsearch._types.Time; @@ -42,6 +44,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchShardStatistics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SortingOptions; import org.slf4j.Logger; @@ -191,6 +194,8 @@ public CreateScrollResponse createScroll(final CreateScrollRequest createScrollR .withCreationTime(Instant.now().toEpochMilli()) .withScrollId(searchResponse.scrollId()) .withDocuments(getDocumentsFromResponse(searchResponse)) + .withShardStatistics(toShardStatistics(searchResponse.shards())) + .withTotalHits(extractTotalHits(searchResponse)) .build(); } @@ -213,6 +218,7 @@ public SearchScrollResponse searchWithScroll(final SearchScrollRequest searchScr return SearchScrollResponse.builder() .withScrollId(searchResponse.scrollId()) .withDocuments(getDocumentsFromResponse(searchResponse)) + .withShardStatistics(toShardStatistics(searchResponse.shards())) .build(); } @@ -271,6 +277,8 @@ private SearchWithSearchAfterResults searchWithSearchAfter(final SearchRequest s return SearchWithSearchAfterResults.builder() .withDocuments(documents) .withNextSearchAfter(nextSearchAfter) + .withShardStatistics(toShardStatistics(searchResponse.shards())) + .withTotalHits(extractTotalHits(searchResponse)) .build(); } catch (final ElasticsearchException e) { if (isDueToNoIndexFound(e)) { @@ -309,6 +317,30 @@ private List getDocumentsFromResponse(final SearchResponse se .collect(Collectors.toList()); } + private SearchShardStatistics toShardStatistics(final ShardStatistics shards) { + if (shards == null) { + return SearchShardStatistics.empty(); + } + final List failures = shards.failures() == null ? null : + shards.failures().stream() + .map(f -> { + final ErrorCause reason = f == null ? null : f.reason(); + return new String[]{ + reason != null ? reason.type() : null, + reason != null ? reason.reason() : null + }; + }) + .collect(Collectors.toList()); + return SearchShardStatistics.fromShardCounts(shards.total(), shards.successful(), shards.failed(), shards.skipped(), failures); + } + + private Long extractTotalHits(final SearchResponse searchResponse) { + if (searchResponse == null || searchResponse.hits() == null || searchResponse.hits().total() == null) { + return null; + } + return searchResponse.hits().total().value(); + } + private List buildSortOptions(final List sortingOptions) { if (sortingOptions == null || sortingOptions.isEmpty()) { return DEFAULT_SORT_OPTIONS; diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java index ae60b73657..4b88ccbf29 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java @@ -6,9 +6,11 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.ErrorCause; import org.opensearch.client.opensearch._types.FieldSort; import org.opensearch.client.opensearch._types.OpenSearchException; import org.opensearch.client.opensearch._types.ScoreSort; +import org.opensearch.client.opensearch._types.ShardStatistics; import org.opensearch.client.opensearch._types.SortOptions; import org.opensearch.client.opensearch._types.SortOrder; import org.opensearch.client.opensearch._types.Time; @@ -42,6 +44,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchShardStatistics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SortingOptions; import org.slf4j.Logger; @@ -186,6 +189,8 @@ public CreateScrollResponse createScroll(final CreateScrollRequest createScrollR .withCreationTime(Instant.now().toEpochMilli()) .withScrollId(searchResponse.scrollId()) .withDocuments(getDocumentsFromResponse(searchResponse)) + .withShardStatistics(toShardStatistics(searchResponse.shards())) + .withTotalHits(extractTotalHits(searchResponse)) .build(); } @@ -207,6 +212,7 @@ public SearchScrollResponse searchWithScroll(final SearchScrollRequest searchScr return SearchScrollResponse.builder() .withScrollId(searchResponse.scrollId()) .withDocuments(getDocumentsFromResponse(searchResponse)) + .withShardStatistics(toShardStatistics(searchResponse.shards())) .build(); } @@ -284,6 +290,8 @@ private SearchWithSearchAfterResults searchWithSearchAfter(final SearchRequest s return SearchWithSearchAfterResults.builder() .withDocuments(documents) .withNextSearchAfter(nextSearchAfter) + .withShardStatistics(toShardStatistics(searchResponse.shards())) + .withTotalHits(extractTotalHits(searchResponse)) .build(); } catch (final OpenSearchException e) { LOG.error(e.getMessage()); @@ -308,6 +316,30 @@ private List getDocumentsFromResponse(final SearchResponse se .collect(Collectors.toList()); } + private SearchShardStatistics toShardStatistics(final ShardStatistics shards) { + if (shards == null) { + return SearchShardStatistics.empty(); + } + final List failures = shards.failures() == null ? null : + shards.failures().stream() + .map(f -> { + final ErrorCause reason = f == null ? null : f.reason(); + return new String[]{ + reason != null ? reason.type() : null, + reason != null ? reason.reason() : null + }; + }) + .collect(Collectors.toList()); + return SearchShardStatistics.fromShardCounts(shards.total(), shards.successful(), shards.failed(), shards.skipped(), failures); + } + + private Long extractTotalHits(final SearchResponse searchResponse) { + if (searchResponse == null || searchResponse.hits() == null || searchResponse.hits().total() == null) { + return null; + } + return searchResponse.hits().total().value(); + } + private List buildSortOptions(final List sortingOptions) { if (sortingOptions == null || sortingOptions.isEmpty()) { return DEFAULT_SORT_OPTIONS; diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreateScrollResponse.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreateScrollResponse.java index 517bab199e..ff229442f8 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreateScrollResponse.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreateScrollResponse.java @@ -13,6 +13,8 @@ public class CreateScrollResponse { private final String scrollId; private final Long scrollCreationTime; private final List documents; + private final SearchShardStatistics shardStatistics; + private final Long totalHits; public List getDocuments() { return documents; @@ -22,12 +24,24 @@ public String getScrollId() { return scrollId; } - public Long getScrollCreationTime() { return scrollCreationTime; } + public Long getScrollCreationTime() { + return scrollCreationTime; + } + + public SearchShardStatistics getShardStatistics() { + return shardStatistics == null ? SearchShardStatistics.empty() : shardStatistics; + } + + public Long getTotalHits() { + return totalHits; + } private CreateScrollResponse(final CreateScrollResponse.Builder builder) { this.scrollId = builder.scrollId; this.scrollCreationTime = builder.scrollCreationTime; this.documents = builder.documents; + this.shardStatistics = builder.shardStatistics; + this.totalHits = builder.totalHits; } public static CreateScrollResponse.Builder builder() { @@ -39,6 +53,8 @@ public static class Builder { private List documents; private String scrollId; private Long scrollCreationTime; + private SearchShardStatistics shardStatistics; + private Long totalHits; public Builder() { @@ -59,6 +75,16 @@ public CreateScrollResponse.Builder withCreationTime(final Long scrollCreationTi return this; } + public CreateScrollResponse.Builder withShardStatistics(final SearchShardStatistics shardStatistics) { + this.shardStatistics = shardStatistics; + return this; + } + + public CreateScrollResponse.Builder withTotalHits(final Long totalHits) { + this.totalHits = totalHits; + return this; + } + public CreateScrollResponse build() { return new CreateScrollResponse(this); } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchScrollResponse.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchScrollResponse.java index 8f5809c03f..c6969ebda9 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchScrollResponse.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchScrollResponse.java @@ -12,16 +12,24 @@ public class SearchScrollResponse { private final String scrollId; private final List documents; + private final SearchShardStatistics shardStatistics; public String getScrollId() { return scrollId; } - public List getDocuments() { return documents; } + public List getDocuments() { + return documents; + } + + public SearchShardStatistics getShardStatistics() { + return shardStatistics == null ? SearchShardStatistics.empty() : shardStatistics; + } private SearchScrollResponse(final SearchScrollResponse.Builder builder) { this.scrollId = builder.scrollId; this.documents = builder.documents; + this.shardStatistics = builder.shardStatistics; } public static SearchScrollResponse.Builder builder() { @@ -32,6 +40,7 @@ public static class Builder { private String scrollId; private List documents; + private SearchShardStatistics shardStatistics; public Builder() { @@ -47,6 +56,11 @@ public SearchScrollResponse.Builder withDocuments(final List documents) { return this; } + public SearchScrollResponse.Builder withShardStatistics(final SearchShardStatistics shardStatistics) { + this.shardStatistics = shardStatistics; + return this; + } + public SearchScrollResponse build() { return new SearchScrollResponse(this); } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchShardStatistics.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchShardStatistics.java new file mode 100644 index 0000000000..e039efff66 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchShardStatistics.java @@ -0,0 +1,228 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.regex.Pattern; + +/** + * Plugin-side view of shard statistics returned by a search request. Keeps the + * plugin decoupled from the underlying client (OpenSearch or Elasticsearch) types. + * + * Failure reasons are aggregated into a normalized map of reason to occurrence + * count so instances remain small even when a cluster returns many shard + * failures. The map is capped at {@link #MAX_DISTINCT_REASONS} distinct keys; + * further unique reasons increment the {@link #OVERFLOW_REASON_KEY} bucket + * instead of growing the map. + */ +public class SearchShardStatistics { + + public static final int MAX_DISTINCT_REASONS = 20; + public static final String OVERFLOW_REASON_KEY = "__other__"; + + private static final Pattern UUID_PATTERN = Pattern.compile( + "\\b[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}\\b"); + private static final Pattern SHARD_ID_PATTERN = Pattern.compile("\\[[^\\]]*\\]\\[\\d+\\]"); + private static final Pattern NODE_ID_PATTERN = Pattern.compile("node\\[[^\\]]+\\]"); + private static final Pattern IP_PORT_PATTERN = Pattern.compile( + "\\b\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}(:\\d{1,5})?\\b"); + private static final int MAX_REASON_KEY_LENGTH = 512; + private static final String UNKNOWN_REASON = "unknown"; + + private static final SearchShardStatistics EMPTY = new SearchShardStatistics(0, 0, 0, 0, Collections.emptyMap()); + + private final int total; + private final int successful; + private final int failed; + private final int skipped; + private final Map failureReasonCounts; + + public SearchShardStatistics(final int total, + final int successful, + final int failed, + final int skipped, + final Map failureReasonCounts) { + this.total = total; + this.successful = successful; + this.failed = failed; + this.skipped = skipped; + this.failureReasonCounts = failureReasonCounts == null + ? Collections.emptyMap() + : Collections.unmodifiableMap(new LinkedHashMap<>(failureReasonCounts)); + } + + public static SearchShardStatistics empty() { + return EMPTY; + } + + public int getTotal() { + return total; + } + + public int getSuccessful() { + return successful; + } + + public int getFailed() { + return failed; + } + + public int getSkipped() { + return skipped; + } + + public Map getFailureReasonCounts() { + return failureReasonCounts; + } + + public boolean hasFailures() { + return failed > 0 || !failureReasonCounts.isEmpty(); + } + + /** + * Normalize a raw failure reason into a stable key by stripping shard ids, + * node ids, and UUIDs so unrelated occurrences of the same underlying error + * collapse to a single map key. + */ + public static String normalizeReason(final String rawReason) { + if (rawReason == null || rawReason.isEmpty()) { + return UNKNOWN_REASON; + } + String normalized = rawReason; + normalized = SHARD_ID_PATTERN.matcher(normalized).replaceAll("[shard]"); + normalized = NODE_ID_PATTERN.matcher(normalized).replaceAll("node[?]"); + normalized = UUID_PATTERN.matcher(normalized).replaceAll(""); + normalized = IP_PORT_PATTERN.matcher(normalized).replaceAll(""); + normalized = normalized.trim(); + if (normalized.isEmpty()) { + return UNKNOWN_REASON; + } + if (normalized.length() > MAX_REASON_KEY_LENGTH) { + normalized = normalized.substring(0, MAX_REASON_KEY_LENGTH); + } + return normalized; + } + + /** + * Normalize a shard-failure cause expressed as a type + message pair. + * Typically {@code type} is the error class from the client (for example + * {@code shard_failure} or {@code illegal_argument_exception}) and + * {@code message} is the human-readable reason string. + */ + public static String normalizeReason(final String type, final String message) { + final boolean hasType = type != null && !type.isEmpty(); + final boolean hasMessage = message != null && !message.isEmpty(); + if (!hasType && !hasMessage) { + return UNKNOWN_REASON; + } + if (hasType && hasMessage) { + return normalizeReason(type + ": " + message); + } + return normalizeReason(hasType ? type : message); + } + + /** + * Increment the count for a normalized reason in the given map, respecting + * the {@link #MAX_DISTINCT_REASONS} cap. Existing keys always increment. + * New keys are added until the cap is reached; after that they fold into + * {@link #OVERFLOW_REASON_KEY}. + */ + public static void incrementFailureReasonCount(final Map counts, + final String normalizedReason, + final long delta) { + Objects.requireNonNull(counts, "counts"); + if (normalizedReason == null || delta <= 0) { + return; + } + if (counts.containsKey(normalizedReason)) { + counts.merge(normalizedReason, delta, Long::sum); + return; + } + if (counts.size() < MAX_DISTINCT_REASONS) { + counts.put(normalizedReason, delta); + return; + } + counts.merge(OVERFLOW_REASON_KEY, delta, Long::sum); + } + + /** + * Merge another map of reason to count into the given counts map, respecting + * the cap. Useful when aggregating per-response statistics into a running + * total (for example in persisted progress state). + */ + public static void mergeFailureReasonCounts(final Map counts, + final Map toMerge) { + if (toMerge == null || toMerge.isEmpty()) { + return; + } + for (final Map.Entry entry : toMerge.entrySet()) { + incrementFailureReasonCount(counts, entry.getKey(), entry.getValue() == null ? 0L : entry.getValue()); + } + } + + /** + * Utility: convert a nullable {@link Number} to int, defaulting to 0. + * Useful when extracting shard counts from client response types that + * may return boxed numbers or null. + */ + public static int numberOrZero(final Number number) { + return number == null ? 0 : number.intValue(); + } + + /** + * Build a {@link SearchShardStatistics} from raw shard counts and a list + * of (type, message) pairs representing individual shard failures. Each + * pair is normalized and aggregated into the capped failure-reason map. + * + * @param total total shard count (nullable) + * @param successful successful shard count (nullable) + * @param failed failed shard count (nullable) + * @param skipped skipped shard count (nullable) + * @param failures list of [type, message] pairs; may be null or empty + */ + public static SearchShardStatistics fromShardCounts(final Number total, + final Number successful, + final Number failed, + final Number skipped, + final List failures) { + final Map failureReasonCounts = new LinkedHashMap<>(); + if (failures != null) { + for (final String[] pair : failures) { + final String type = pair != null && pair.length > 0 ? pair[0] : null; + final String message = pair != null && pair.length > 1 ? pair[1] : null; + final String key = normalizeReason(type, message); + incrementFailureReasonCount(failureReasonCounts, key, 1L); + } + } + return new SearchShardStatistics( + numberOrZero(total), + numberOrZero(successful), + numberOrZero(failed), + numberOrZero(skipped), + failureReasonCounts); + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (!(o instanceof SearchShardStatistics)) return false; + final SearchShardStatistics that = (SearchShardStatistics) o; + return total == that.total && successful == that.successful && failed == that.failed + && skipped == that.skipped && Objects.equals(failureReasonCounts, that.failureReasonCounts); + } + + @Override + public int hashCode() { + return Objects.hash(total, successful, failed, skipped, failureReasonCounts); + } +} diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchWithSearchAfterResults.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchWithSearchAfterResults.java index 36fedf1acf..65896b53c5 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchWithSearchAfterResults.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchWithSearchAfterResults.java @@ -12,7 +12,9 @@ public class SearchWithSearchAfterResults { private final List documents; private final List nextSearchAfter; - + private final SearchShardStatistics shardStatistics; + private final Long totalHits; + public List getDocuments() { return documents; } @@ -21,9 +23,19 @@ public List getNextSearchAfter() { return nextSearchAfter; } + public SearchShardStatistics getShardStatistics() { + return shardStatistics == null ? SearchShardStatistics.empty() : shardStatistics; + } + + public Long getTotalHits() { + return totalHits; + } + private SearchWithSearchAfterResults(final SearchWithSearchAfterResults.Builder builder) { this.documents = builder.documents; this.nextSearchAfter = builder.nextSearchAfter; + this.shardStatistics = builder.shardStatistics; + this.totalHits = builder.totalHits; } public static SearchWithSearchAfterResults.Builder builder() { @@ -34,6 +46,8 @@ public static class Builder { private List documents; private List nextSearchAfter; + private SearchShardStatistics shardStatistics; + private Long totalHits; public Builder() { @@ -49,6 +63,15 @@ public SearchWithSearchAfterResults.Builder withNextSearchAfter(final List seed = new LinkedHashMap<>(); + seed.put("reason", 2L); + state.setFailureReasonCounts(seed); + assertThat(state.getFailureReasonCounts().get("reason"), equalTo(2L)); + + state.setFailureReasonCounts(null); + + assertThat(state.getFailureReasonCounts(), notNullValue()); + assertThat(state.getFailureReasonCounts().isEmpty(), is(true)); + } + + @Test + void recordShardFailures_with_null_is_a_no_op() { + final OpenSearchIndexProgressState state = new OpenSearchIndexProgressState(); + + state.recordShardFailures(null); + + assertThat(state.isHadSearchFailures(), is(false)); + assertThat(state.getFailureReasonCounts().isEmpty(), is(true)); + } + + @Test + void recordShardFailures_with_no_failures_is_a_no_op() { + final OpenSearchIndexProgressState state = new OpenSearchIndexProgressState(); + + state.recordShardFailures(SearchShardStatistics.empty()); + + assertThat(state.isHadSearchFailures(), is(false)); + assertThat(state.getFailureReasonCounts().isEmpty(), is(true)); + } + + @Test + void recordShardFailures_sets_flag_and_merges_reason_counts() { + final OpenSearchIndexProgressState state = new OpenSearchIndexProgressState(); + + final Map firstBatch = new LinkedHashMap<>(); + firstBatch.put("shard_failure: timed out", 2L); + firstBatch.put("shard_failure: rejected", 1L); + state.recordShardFailures(new SearchShardStatistics(5, 2, 3, 0, firstBatch)); + + final Map secondBatch = new LinkedHashMap<>(); + secondBatch.put("shard_failure: timed out", 4L); + secondBatch.put("shard_failure: other", 1L); + state.recordShardFailures(new SearchShardStatistics(5, 3, 2, 0, secondBatch)); + + assertThat(state.isHadSearchFailures(), is(true)); + assertThat(state.getFailureReasonCounts().get("shard_failure: timed out"), equalTo(6L)); + assertThat(state.getFailureReasonCounts().get("shard_failure: rejected"), equalTo(1L)); + assertThat(state.getFailureReasonCounts().get("shard_failure: other"), equalTo(1L)); + } + + @Test + void recordShardFailures_enforces_the_20_key_cap_across_merges() { + final OpenSearchIndexProgressState state = new OpenSearchIndexProgressState(); + + for (int i = 0; i < SearchShardStatistics.MAX_DISTINCT_REASONS; i++) { + final Map batch = new LinkedHashMap<>(); + batch.put("reason-" + i, 1L); + state.recordShardFailures(new SearchShardStatistics(1, 0, 1, 0, batch)); + } + + final Map overflowBatch = new LinkedHashMap<>(); + overflowBatch.put("overflow-a", 1L); + overflowBatch.put("overflow-b", 2L); + state.recordShardFailures(new SearchShardStatistics(1, 0, 1, 0, overflowBatch)); + + assertThat(state.getFailureReasonCounts().size(), equalTo(SearchShardStatistics.MAX_DISTINCT_REASONS + 1)); + assertThat(state.getFailureReasonCounts().get(SearchShardStatistics.OVERFLOW_REASON_KEY), equalTo(3L)); + assertThat(state.getFailureReasonCounts().get("overflow-a"), nullValue()); + } + + @Test + void recordRequestFailure_normalizes_exception_into_reason_and_increments_count() { + final OpenSearchIndexProgressState state = new OpenSearchIndexProgressState(); + + state.recordRequestFailure(new SocketTimeoutException("read timed out")); + state.recordRequestFailure(new SocketTimeoutException("read timed out")); + state.recordRequestFailure(new IOException("connection reset")); + + assertThat(state.isHadSearchFailures(), is(true)); + assertThat(state.getFailureReasonCounts().get("SocketTimeoutException: read timed out"), equalTo(2L)); + assertThat(state.getFailureReasonCounts().get("IOException: connection reset"), equalTo(1L)); + } + + @Test + void recordRequestFailure_with_null_throwable_sets_flag_with_unknown_reason() { + final OpenSearchIndexProgressState state = new OpenSearchIndexProgressState(); + + state.recordRequestFailure(null); + + assertThat(state.isHadSearchFailures(), is(true)); + assertThat(state.getFailureReasonCounts().get("unknown"), equalTo(1L)); + } + + @Test + void recordRequestFailure_with_null_message_uses_class_name_only() { + final OpenSearchIndexProgressState state = new OpenSearchIndexProgressState(); + + state.recordRequestFailure(new RuntimeException((String) null)); + + assertThat(state.isHadSearchFailures(), is(true)); + assertThat(state.getFailureReasonCounts().get("RuntimeException"), equalTo(1L)); + } + + @Test + void setHadSearchFailures_invalidates_aggregator_cache() { + final OpenSearchIndexProgressState state = new OpenSearchIndexProgressState(); + state.recordRequestFailure(new RuntimeException("boom")); + assertThat(state.isHadSearchFailures(), is(true)); + + state.setHadSearchFailures(false); + + assertThat(state.isHadSearchFailures(), is(false)); + } + + @Test + void setFailureReasonCounts_invalidates_aggregator_and_uses_new_counts() { + final OpenSearchIndexProgressState state = new OpenSearchIndexProgressState(); + state.recordRequestFailure(new RuntimeException("initial")); + + final Map replacement = new LinkedHashMap<>(); + replacement.put("replaced", 99L); + state.setFailureReasonCounts(replacement); + + assertThat(state.getFailureReasonCounts().get("replaced"), equalTo(99L)); + assertThat(state.getFailureReasonCounts().get("RuntimeException: initial"), nullValue()); + } + + @Test + void recording_after_setter_accumulates_on_top_of_new_state() { + final OpenSearchIndexProgressState state = new OpenSearchIndexProgressState(); + + final Map seed = new LinkedHashMap<>(); + seed.put("existing", 5L); + state.setFailureReasonCounts(seed); + state.setHadSearchFailures(true); + + state.recordRequestFailure(new RuntimeException("new error")); + + assertThat(state.getFailureReasonCounts().get("existing"), equalTo(5L)); + assertThat(state.getFailureReasonCounts().get("RuntimeException: new error"), equalTo(1L)); + } + + @Test + void jackson_constructor_with_pre_populated_data_allows_further_recording() { + final Map existing = new LinkedHashMap<>(); + existing.put("prior_error", 10L); + final OpenSearchIndexProgressState state = new OpenSearchIndexProgressState( + null, null, null, null, true, existing); + + state.recordRequestFailure(new RuntimeException("new")); + + assertThat(state.isHadSearchFailures(), is(true)); + assertThat(state.getFailureReasonCounts().get("prior_error"), equalTo(10L)); + assertThat(state.getFailureReasonCounts().get("RuntimeException: new"), equalTo(1L)); + } +} diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/ShardFailureAggregatorTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/ShardFailureAggregatorTest.java new file mode 100644 index 0000000000..4d9d360fcb --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/ShardFailureAggregatorTest.java @@ -0,0 +1,174 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.source.opensearch; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchShardStatistics; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +class ShardFailureAggregatorTest { + + @Test + void new_aggregator_has_no_failures() { + final ShardFailureAggregator aggregator = new ShardFailureAggregator(); + + assertThat(aggregator.hadFailures(), is(false)); + assertThat(aggregator.getFailureReasonCounts().isEmpty(), is(true)); + } + + @Test + void constructor_with_existing_state_restores_correctly() { + final Map existing = new LinkedHashMap<>(); + existing.put("reason-a", 5L); + existing.put("reason-b", 3L); + + final ShardFailureAggregator aggregator = new ShardFailureAggregator(true, existing); + + assertThat(aggregator.hadFailures(), is(true)); + assertThat(aggregator.getFailureReasonCounts().get("reason-a"), equalTo(5L)); + assertThat(aggregator.getFailureReasonCounts().get("reason-b"), equalTo(3L)); + } + + @Test + void constructor_with_null_counts_initializes_empty_map() { + final ShardFailureAggregator aggregator = new ShardFailureAggregator(false, null); + + assertThat(aggregator.hadFailures(), is(false)); + assertThat(aggregator.getFailureReasonCounts().isEmpty(), is(true)); + } + + @Test + void recordShardFailures_with_null_is_a_no_op() { + final ShardFailureAggregator aggregator = new ShardFailureAggregator(); + + aggregator.recordShardFailures(null); + + assertThat(aggregator.hadFailures(), is(false)); + assertThat(aggregator.getFailureReasonCounts().isEmpty(), is(true)); + } + + @Test + void recordShardFailures_with_no_failures_is_a_no_op() { + final ShardFailureAggregator aggregator = new ShardFailureAggregator(); + + aggregator.recordShardFailures(SearchShardStatistics.empty()); + + assertThat(aggregator.hadFailures(), is(false)); + } + + @Test + void recordShardFailures_sets_flag_and_merges_counts() { + final ShardFailureAggregator aggregator = new ShardFailureAggregator(); + + final Map batch1 = new LinkedHashMap<>(); + batch1.put("timeout", 2L); + aggregator.recordShardFailures(new SearchShardStatistics(5, 3, 2, 0, batch1)); + + final Map batch2 = new LinkedHashMap<>(); + batch2.put("timeout", 1L); + batch2.put("rejected", 3L); + aggregator.recordShardFailures(new SearchShardStatistics(5, 2, 3, 0, batch2)); + + assertThat(aggregator.hadFailures(), is(true)); + assertThat(aggregator.getFailureReasonCounts().get("timeout"), equalTo(3L)); + assertThat(aggregator.getFailureReasonCounts().get("rejected"), equalTo(3L)); + } + + @Test + void recordShardFailures_respects_max_distinct_reasons_cap() { + final ShardFailureAggregator aggregator = new ShardFailureAggregator(); + + for (int i = 0; i < ShardFailureAggregator.MAX_DISTINCT_REASONS; i++) { + final Map batch = new LinkedHashMap<>(); + batch.put("reason-" + i, 1L); + aggregator.recordShardFailures(new SearchShardStatistics(1, 0, 1, 0, batch)); + } + + final Map overflow = new LinkedHashMap<>(); + overflow.put("new-reason-a", 2L); + overflow.put("new-reason-b", 3L); + aggregator.recordShardFailures(new SearchShardStatistics(1, 0, 1, 0, overflow)); + + assertThat(aggregator.getFailureReasonCounts().size(), + equalTo(ShardFailureAggregator.MAX_DISTINCT_REASONS + 1)); + assertThat(aggregator.getFailureReasonCounts().get(ShardFailureAggregator.OVERFLOW_REASON_KEY), equalTo(5L)); + assertThat(aggregator.getFailureReasonCounts().get("new-reason-a"), nullValue()); + } + + @Test + void recordRequestFailure_normalizes_exception_class_and_message() { + final ShardFailureAggregator aggregator = new ShardFailureAggregator(); + + aggregator.recordRequestFailure(new IOException("connection reset")); + aggregator.recordRequestFailure(new IOException("connection reset")); + + assertThat(aggregator.hadFailures(), is(true)); + assertThat(aggregator.getFailureReasonCounts().get("IOException: connection reset"), equalTo(2L)); + } + + @Test + void recordRequestFailure_with_null_throwable_records_unknown() { + final ShardFailureAggregator aggregator = new ShardFailureAggregator(); + + aggregator.recordRequestFailure(null); + + assertThat(aggregator.hadFailures(), is(true)); + assertThat(aggregator.getFailureReasonCounts().get("unknown"), equalTo(1L)); + } + + @Test + void recordRequestFailure_with_null_message_uses_class_name_only() { + final ShardFailureAggregator aggregator = new ShardFailureAggregator(); + + aggregator.recordRequestFailure(new RuntimeException((String) null)); + + assertThat(aggregator.getFailureReasonCounts().get("RuntimeException"), equalTo(1L)); + } + + @Test + void recordRequestFailure_normalizes_ip_addresses_in_message() { + final ShardFailureAggregator aggregator = new ShardFailureAggregator(); + + aggregator.recordRequestFailure(new IOException("Connection refused to 10.0.1.5:9300")); + + assertThat(aggregator.getFailureReasonCounts().get("IOException: Connection refused to "), equalTo(1L)); + } + + @Test + void getFailureReasonCounts_returns_unmodifiable_view() { + final ShardFailureAggregator aggregator = new ShardFailureAggregator(); + aggregator.recordRequestFailure(new RuntimeException("boom")); + + org.junit.jupiter.api.Assertions.assertThrows(UnsupportedOperationException.class, + () -> aggregator.getFailureReasonCounts().put("hack", 1L)); + } + + @Test + void accumulates_both_shard_failures_and_request_failures() { + final ShardFailureAggregator aggregator = new ShardFailureAggregator(); + + final Map shardBatch = new LinkedHashMap<>(); + shardBatch.put("shard_failure: timed out", 2L); + aggregator.recordShardFailures(new SearchShardStatistics(5, 3, 2, 0, shardBatch)); + aggregator.recordRequestFailure(new IOException("connection refused")); + + assertThat(aggregator.hadFailures(), is(true)); + assertThat(aggregator.getFailureReasonCounts().get("shard_failure: timed out"), equalTo(2L)); + assertThat(aggregator.getFailureReasonCounts().get("IOException: connection refused"), equalTo(1L)); + } +} diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetricsTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetricsTest.java new file mode 100644 index 0000000000..9d436645a2 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetricsTest.java @@ -0,0 +1,78 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.metrics; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Timer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class OpenSearchSourcePluginMetricsTest { + + @Mock + private PluginMetrics pluginMetrics; + + private Counter searchRequestsFailedCounter; + private Counter searchShardsFailedCounter; + private Counter indicesCompletedWithFailuresCounter; + + private OpenSearchSourcePluginMetrics objectUnderTest; + + @BeforeEach + void setup() { + searchRequestsFailedCounter = mock(Counter.class); + searchShardsFailedCounter = mock(Counter.class); + indicesCompletedWithFailuresCounter = mock(Counter.class); + + when(pluginMetrics.counter("documentsProcessed")).thenReturn(mock(Counter.class)); + when(pluginMetrics.counter("indicesProcessed")).thenReturn(mock(Counter.class)); + when(pluginMetrics.counter("processingErrors")).thenReturn(mock(Counter.class)); + when(pluginMetrics.counter("credentialsChanged")).thenReturn(mock(Counter.class)); + when(pluginMetrics.counter("clientRefreshErrors")).thenReturn(mock(Counter.class)); + when(pluginMetrics.counter("searchRequestsFailed")).thenReturn(searchRequestsFailedCounter); + when(pluginMetrics.counter("searchShardsFailed")).thenReturn(searchShardsFailedCounter); + when(pluginMetrics.counter("indicesCompletedWithFailures")).thenReturn(indicesCompletedWithFailuresCounter); + when(pluginMetrics.timer("indexProcessingTime")).thenReturn(mock(Timer.class)); + when(pluginMetrics.summary("bytesReceived")).thenReturn(mock(DistributionSummary.class)); + when(pluginMetrics.summary("bytesProcessed")).thenReturn(mock(DistributionSummary.class)); + + objectUnderTest = OpenSearchSourcePluginMetrics.create(pluginMetrics); + } + + @Test + void getSearchRequestsFailedCounter_returns_initialized_counter() { + assertThat(objectUnderTest.getSearchRequestsFailedCounter(), notNullValue()); + assertThat(objectUnderTest.getSearchRequestsFailedCounter(), sameInstance(searchRequestsFailedCounter)); + } + + @Test + void getSearchShardsFailedCounter_returns_initialized_counter() { + assertThat(objectUnderTest.getSearchShardsFailedCounter(), notNullValue()); + assertThat(objectUnderTest.getSearchShardsFailedCounter(), sameInstance(searchShardsFailedCounter)); + } + + @Test + void getIndicesCompletedWithFailuresCounter_returns_initialized_counter() { + assertThat(objectUnderTest.getIndicesCompletedWithFailuresCounter(), notNullValue()); + assertThat(objectUnderTest.getIndicesCompletedWithFailuresCounter(), sameInstance(indicesCompletedWithFailuresCounter)); + } +} diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java index a162e352c5..ad6de9aa6d 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java @@ -31,11 +31,14 @@ import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.IndexNotFoundException; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.NoSearchContextSearchRequest; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchShardStatistics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults; import java.time.Duration; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -45,6 +48,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; @@ -189,7 +193,16 @@ void run_with_getNextPartition_with_non_empty_partition_processes_and_closes_tha when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration); final SearchWithSearchAfterResults searchWithSearchAfterResults = mock(SearchWithSearchAfterResults.class); - when(searchWithSearchAfterResults.getNextSearchAfter()).thenReturn(Collections.singletonList(UUID.randomUUID().toString())); + final List firstPageSearchAfter = Collections.singletonList(UUID.randomUUID().toString()); + // Second page returns null nextSearchAfter, signalling the end of the index under the new termination contract. + // getNextSearchAfter is called ~3 times while processing the first page and once at the + // start of the second page. Returning a non-null value for those first 3 calls drives the + // second search_after request to use the captured cursor; null afterwards terminates. + when(searchWithSearchAfterResults.getNextSearchAfter()) + .thenReturn(firstPageSearchAfter) + .thenReturn(firstPageSearchAfter) + .thenReturn(firstPageSearchAfter) + .thenReturn(null); final Event testEvent1 = mock(Event.class); final Event testEvent2 = mock(Event.class); final Event testEvent3 = mock(Event.class); @@ -243,7 +256,7 @@ void run_with_getNextPartition_with_non_empty_partition_processes_and_closes_tha assertThat(noSearchContextSearchRequests.get(1), notNullValue()); assertThat(noSearchContextSearchRequests.get(1).getIndex(), equalTo(partitionKey)); assertThat(noSearchContextSearchRequests.get(1).getPaginationSize(), equalTo(2)); - assertThat(noSearchContextSearchRequests.get(1).getSearchAfter(), equalTo(searchWithSearchAfterResults.getNextSearchAfter())); + assertThat(noSearchContextSearchRequests.get(1).getSearchAfter(), equalTo(firstPageSearchAfter)); verify(bytesReceivedSummary).record(10L); verify(bytesReceivedSummary).record(20L); @@ -284,7 +297,15 @@ void run_with_getNextPartition_with_acknowledgments_processes_and_closes_that_pa when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration); final SearchWithSearchAfterResults searchWithSearchAfterResults = mock(SearchWithSearchAfterResults.class); - when(searchWithSearchAfterResults.getNextSearchAfter()).thenReturn(Collections.singletonList(UUID.randomUUID().toString())); + final List firstPageSearchAfter = Collections.singletonList(UUID.randomUUID().toString()); + // getNextSearchAfter is called ~3 times while processing the first page and once at the + // start of the second page. Returning a non-null value for those first 3 calls drives the + // second search_after request to use the captured cursor; null afterwards terminates. + when(searchWithSearchAfterResults.getNextSearchAfter()) + .thenReturn(firstPageSearchAfter) + .thenReturn(firstPageSearchAfter) + .thenReturn(firstPageSearchAfter) + .thenReturn(null); final Event testEvent1 = mock(Event.class); final Event testEvent2 = mock(Event.class); final Event testEvent3 = mock(Event.class); @@ -338,7 +359,7 @@ void run_with_getNextPartition_with_acknowledgments_processes_and_closes_that_pa assertThat(noSearchContextSearchRequests.get(1), notNullValue()); assertThat(noSearchContextSearchRequests.get(1).getIndex(), equalTo(partitionKey)); assertThat(noSearchContextSearchRequests.get(1).getPaginationSize(), equalTo(2)); - assertThat(noSearchContextSearchRequests.get(1).getSearchAfter(), equalTo(searchWithSearchAfterResults.getNextSearchAfter())); + assertThat(noSearchContextSearchRequests.get(1).getSearchAfter(), equalTo(firstPageSearchAfter)); verify(acknowledgementSet).complete(); @@ -353,6 +374,156 @@ void run_with_getNextPartition_with_acknowledgments_processes_and_closes_that_pa verifyNoInteractions(processingErrorsCounter); } + @Test + void run_continues_past_short_page_and_terminates_when_nextSearchAfter_is_null() throws Exception { + mockTimerCallable(); + + final SourcePartition sourcePartition = mock(SourcePartition.class); + final String partitionKey = UUID.randomUUID().toString(); + when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); + when(sourcePartition.getPartitionState()).thenReturn(Optional.empty()); + + final SearchConfiguration searchConfiguration = mock(SearchConfiguration.class); + when(searchConfiguration.getBatchSize()).thenReturn(2); + when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration); + + final List firstPageSearchAfter = Collections.singletonList(UUID.randomUUID().toString()); + + final Event testEvent1 = mock(Event.class); + final Event testEvent2 = mock(Event.class); + final JsonNode testData1 = mock(JsonNode.class); + final JsonNode testData2 = mock(JsonNode.class); + when(testEvent1.getJsonNode()).thenReturn(testData1); + when(testEvent2.getJsonNode()).thenReturn(testData2); + when(objectMapper.writeValueAsBytes(testData1)).thenReturn(new byte[10]); + when(objectMapper.writeValueAsBytes(testData2)).thenReturn(new byte[20]); + + // First page returns a SHORT page (1 doc < batch_size=2) with a cursor — pagination must continue. + // Second page returns null nextSearchAfter to terminate. + when(searchAccessor.searchWithoutSearchContext(any(NoSearchContextSearchRequest.class))).thenAnswer(invocation -> { + final NoSearchContextSearchRequest request = invocation.getArgument(0); + final SearchWithSearchAfterResults results = mock(SearchWithSearchAfterResults.class); + if (request.getSearchAfter() == null) { + when(results.getNextSearchAfter()).thenReturn(firstPageSearchAfter); + when(results.getDocuments()).thenReturn(List.of(testEvent1)); + return results; + } else { + when(results.getNextSearchAfter()).thenReturn(null); + when(results.getDocuments()).thenReturn(List.of(testEvent2)); + return results; + } + }); + + doNothing().when(bufferAccumulator).add(any(Record.class)); + doNothing().when(bufferAccumulator).flush(); + + when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)) + .thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty()); + + final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class); + when(schedulingParameterConfiguration.getIndexReadCount()).thenReturn(1); + when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO); + when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); + + doNothing().when(sourceCoordinator).closePartition(partitionKey, Duration.ZERO, 1, false); + + final Future future = executorService.submit(() -> createObjectUnderTest().run()); + await().atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> verify(indicesProcessedCounter).increment()); + + executorService.shutdown(); + future.cancel(true); + assertThat(executorService.awaitTermination(1, TimeUnit.SECONDS), equalTo(true)); + + // Key assertion: 2 search requests were made (past the short page). + // The old logic (size == batchSize) would have stopped after the short page. + verify(searchAccessor, times(2)).searchWithoutSearchContext(any(NoSearchContextSearchRequest.class)); + verify(documentsProcessedCounter, times(2)).increment(); + } + + @Test + void run_when_page_has_shard_failures_records_them_to_metrics_and_continues_paginating() throws Exception { + mockTimerCallable(); + + final Counter searchShardsFailedCounter = mock(Counter.class); + when(openSearchSourcePluginMetrics.getSearchShardsFailedCounter()).thenReturn(searchShardsFailedCounter); + + final SourcePartition sourcePartition = mock(SourcePartition.class); + final String partitionKey = UUID.randomUUID().toString(); + when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); + when(sourcePartition.getPartitionState()).thenReturn(Optional.empty()); + + final SearchConfiguration searchConfiguration = mock(SearchConfiguration.class); + when(searchConfiguration.getBatchSize()).thenReturn(2); + when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration); + + final SearchWithSearchAfterResults searchWithSearchAfterResults = mock(SearchWithSearchAfterResults.class); + final List firstPageSearchAfter = Collections.singletonList(UUID.randomUUID().toString()); + when(searchWithSearchAfterResults.getNextSearchAfter()) + .thenReturn(firstPageSearchAfter) + .thenReturn(firstPageSearchAfter) + .thenReturn(firstPageSearchAfter) + .thenReturn(null); + + final Event testEvent1 = mock(Event.class); + final Event testEvent2 = mock(Event.class); + final Event testEvent3 = mock(Event.class); + final JsonNode testData1 = mock(JsonNode.class); + final JsonNode testData2 = mock(JsonNode.class); + final JsonNode testData3 = mock(JsonNode.class); + when(testEvent1.getJsonNode()).thenReturn(testData1); + when(testEvent2.getJsonNode()).thenReturn(testData2); + when(testEvent3.getJsonNode()).thenReturn(testData3); + when(objectMapper.writeValueAsBytes(testData1)).thenReturn(new byte[10]); + when(objectMapper.writeValueAsBytes(testData2)).thenReturn(new byte[20]); + when(objectMapper.writeValueAsBytes(testData3)).thenReturn(new byte[30]); + when(searchWithSearchAfterResults.getDocuments()) + .thenReturn(List.of(testEvent1, testEvent2)) + .thenReturn(List.of(testEvent1, testEvent2)) + .thenReturn(List.of(testEvent3)) + .thenReturn(List.of(testEvent3)); + + // First page has shard failures, second page does not. + final Map firstPageReasons = new LinkedHashMap<>(); + firstPageReasons.put("shard_failure: timed out", 3L); + final SearchShardStatistics firstPageStats = new SearchShardStatistics(5, 2, 3, 0, firstPageReasons); + when(searchWithSearchAfterResults.getShardStatistics()) + .thenReturn(firstPageStats) + .thenReturn(SearchShardStatistics.empty()); + + when(searchAccessor.searchWithoutSearchContext(any(NoSearchContextSearchRequest.class))) + .thenReturn(searchWithSearchAfterResults); + + doNothing().when(bufferAccumulator).add(any(Record.class)); + doNothing().when(bufferAccumulator).flush(); + + when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)) + .thenReturn(Optional.of(sourcePartition)) + .thenReturn(Optional.empty()); + + final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class); + when(schedulingParameterConfiguration.getIndexReadCount()).thenReturn(1); + when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO); + when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); + + doNothing().when(sourceCoordinator).closePartition(partitionKey, Duration.ZERO, 1, false); + + final Future future = executorService.submit(() -> createObjectUnderTest().run()); + // Wait for the worker thread to finish processing the partition before asserting. + await().atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> verify(indicesProcessedCounter).increment()); + + executorService.shutdown(); + future.cancel(true); + assertThat(executorService.awaitTermination(1, TimeUnit.SECONDS), equalTo(true)); + + // Shard failures from the first page are recorded once and pagination still continues + // through the second page (two searchWithoutSearchContext calls total). + verify(searchShardsFailedCounter).increment(3.0); + verify(searchAccessor, times(2)).searchWithoutSearchContext(any(NoSearchContextSearchRequest.class)); + verify(documentsProcessedCounter, times(3)).increment(); + } + private void mockTimerCallable() { doAnswer(a -> { a.getArgument(0).run(); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java index 29b1bea4b0..f39f2b5757 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java @@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; @@ -166,8 +167,8 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_ when(searchConfiguration.getBatchSize()).thenReturn(2); when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration); - final SearchWithSearchAfterResults searchWithSearchAfterResults = mock(SearchWithSearchAfterResults.class); - when(searchWithSearchAfterResults.getNextSearchAfter()).thenReturn(Collections.singletonList(UUID.randomUUID().toString())); + final List firstPageSearchAfter = Collections.singletonList(UUID.randomUUID().toString()); + final Event testEvent1 = mock(Event.class); final Event testEvent2 = mock(Event.class); final Event testEvent3 = mock(Event.class); @@ -180,11 +181,27 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_ when(objectMapper.writeValueAsBytes(testData1)).thenReturn(new byte[10]); when(objectMapper.writeValueAsBytes(testData2)).thenReturn(new byte[20]); when(objectMapper.writeValueAsBytes(testData3)).thenReturn(new byte[30]); - when(searchWithSearchAfterResults.getDocuments()).thenReturn(List.of(testEvent1, testEvent2)).thenReturn(List.of(testEvent1, testEvent2)) - .thenReturn(List.of(testEvent3)).thenReturn(List.of(testEvent3)); + // Use Answer to transition based on search request content: the first request has + // no searchAfter (initial page), while the second has a cursor. This condition-based + // approach avoids relying on internal call counts. final ArgumentCaptor searchPointInTimeRequestArgumentCaptor = ArgumentCaptor.forClass(SearchPointInTimeRequest.class); - when(searchAccessor.searchWithPit(searchPointInTimeRequestArgumentCaptor.capture())).thenReturn(searchWithSearchAfterResults); + when(searchAccessor.searchWithPit(searchPointInTimeRequestArgumentCaptor.capture())).thenAnswer(invocation -> { + final SearchPointInTimeRequest request = invocation.getArgument(0); + if (request.getSearchAfter() == null) { + // First page: has documents and a cursor to continue + final SearchWithSearchAfterResults results = mock(SearchWithSearchAfterResults.class); + when(results.getNextSearchAfter()).thenReturn(firstPageSearchAfter); + when(results.getDocuments()).thenReturn(List.of(testEvent1, testEvent2)); + return results; + } else { + // Subsequent page: has documents but null cursor signals end-of-index + final SearchWithSearchAfterResults results = mock(SearchWithSearchAfterResults.class); + when(results.getNextSearchAfter()).thenReturn(null); + when(results.getDocuments()).thenReturn(List.of(testEvent3)); + return results; + } + }); doNothing().when(bufferAccumulator).add(any(Record.class)); doNothing().when(bufferAccumulator).flush(); @@ -231,7 +248,7 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_ assertThat(searchPointInTimeRequestList.get(1).getPitId(), equalTo(pitId)); assertThat(searchPointInTimeRequestList.get(1).getKeepAlive(), equalTo(EXTEND_KEEP_ALIVE_TIME)); assertThat(searchPointInTimeRequestList.get(1).getPaginationSize(), equalTo(2)); - assertThat(searchPointInTimeRequestList.get(1).getSearchAfter(), equalTo(searchWithSearchAfterResults.getNextSearchAfter())); + assertThat(searchPointInTimeRequestList.get(1).getSearchAfter(), equalTo(firstPageSearchAfter)); final DeletePointInTimeRequest deletePointInTimeRequest = deleteRequestArgumentCaptor.getValue(); @@ -284,8 +301,8 @@ void run_with_acknowledgments_enabled_creates_and_deletes_pit_and_closes_that_pa when(searchConfiguration.getBatchSize()).thenReturn(2); when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration); - final SearchWithSearchAfterResults searchWithSearchAfterResults = mock(SearchWithSearchAfterResults.class); - when(searchWithSearchAfterResults.getNextSearchAfter()).thenReturn(Collections.singletonList(UUID.randomUUID().toString())); + final List firstPageSearchAfter = Collections.singletonList(UUID.randomUUID().toString()); + final Event testEvent1 = mock(Event.class); final Event testEvent2 = mock(Event.class); final Event testEvent3 = mock(Event.class); @@ -298,11 +315,22 @@ void run_with_acknowledgments_enabled_creates_and_deletes_pit_and_closes_that_pa when(objectMapper.writeValueAsBytes(testData1)).thenReturn(new byte[10]); when(objectMapper.writeValueAsBytes(testData2)).thenReturn(new byte[20]); when(objectMapper.writeValueAsBytes(testData3)).thenReturn(new byte[30]); - when(searchWithSearchAfterResults.getDocuments()).thenReturn(List.of(testEvent1, testEvent2)).thenReturn(List.of(testEvent1, testEvent2)) - .thenReturn(List.of(testEvent3)).thenReturn(List.of(testEvent3)); final ArgumentCaptor searchPointInTimeRequestArgumentCaptor = ArgumentCaptor.forClass(SearchPointInTimeRequest.class); - when(searchAccessor.searchWithPit(searchPointInTimeRequestArgumentCaptor.capture())).thenReturn(searchWithSearchAfterResults); + when(searchAccessor.searchWithPit(searchPointInTimeRequestArgumentCaptor.capture())).thenAnswer(invocation -> { + final SearchPointInTimeRequest request = invocation.getArgument(0); + if (request.getSearchAfter() == null) { + final SearchWithSearchAfterResults results = mock(SearchWithSearchAfterResults.class); + when(results.getNextSearchAfter()).thenReturn(firstPageSearchAfter); + when(results.getDocuments()).thenReturn(List.of(testEvent1, testEvent2)); + return results; + } else { + final SearchWithSearchAfterResults results = mock(SearchWithSearchAfterResults.class); + when(results.getNextSearchAfter()).thenReturn(null); + when(results.getDocuments()).thenReturn(List.of(testEvent3)); + return results; + } + }); doNothing().when(bufferAccumulator).add(any(Record.class)); doNothing().when(bufferAccumulator).flush(); @@ -350,7 +378,7 @@ void run_with_acknowledgments_enabled_creates_and_deletes_pit_and_closes_that_pa assertThat(searchPointInTimeRequestList.get(1).getPitId(), equalTo(pitId)); assertThat(searchPointInTimeRequestList.get(1).getKeepAlive(), equalTo(EXTEND_KEEP_ALIVE_TIME)); assertThat(searchPointInTimeRequestList.get(1).getPaginationSize(), equalTo(2)); - assertThat(searchPointInTimeRequestList.get(1).getSearchAfter(), equalTo(searchWithSearchAfterResults.getNextSearchAfter())); + assertThat(searchPointInTimeRequestList.get(1).getSearchAfter(), equalTo(firstPageSearchAfter)); final DeletePointInTimeRequest deletePointInTimeRequest = deleteRequestArgumentCaptor.getValue(); @@ -391,7 +419,15 @@ void run_with_getNextPartition_with_valid_existing_point_in_time_does_not_create when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration); final SearchWithSearchAfterResults searchWithSearchAfterResults = mock(SearchWithSearchAfterResults.class); - when(searchWithSearchAfterResults.getNextSearchAfter()).thenReturn(Collections.singletonList(UUID.randomUUID().toString())); + final List firstPageSearchAfter = Collections.singletonList(UUID.randomUUID().toString()); + // getNextSearchAfter is called ~3 times while processing the first page and once at the + // start of the second page. Returning a non-null value for those first 3 calls drives the + // second search_after request to use the captured cursor; null afterwards terminates. + when(searchWithSearchAfterResults.getNextSearchAfter()) + .thenReturn(firstPageSearchAfter) + .thenReturn(firstPageSearchAfter) + .thenReturn(firstPageSearchAfter) + .thenReturn(null); final Event testEvent1 = mock(Event.class); final Event testEvent2 = mock(Event.class); final Event testEvent3 = mock(Event.class); @@ -454,6 +490,79 @@ void run_with_getNextPartition_with_valid_existing_point_in_time_does_not_create verifyNoInteractions(processingErrorsCounter); } + @Test + void run_continues_past_short_page_and_terminates_when_nextSearchAfter_is_null() throws Exception { + mockTimerCallable(); + + final SourcePartition sourcePartition = mock(SourcePartition.class); + final String partitionKey = UUID.randomUUID().toString(); + when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); + when(sourcePartition.getPartitionState()).thenReturn(Optional.empty()); + + final String pitId = UUID.randomUUID().toString(); + final CreatePointInTimeResponse createPointInTimeResponse = mock(CreatePointInTimeResponse.class); + when(createPointInTimeResponse.getPitId()).thenReturn(pitId); + when(searchAccessor.createPit(any(CreatePointInTimeRequest.class))).thenReturn(createPointInTimeResponse); + + final SearchConfiguration searchConfiguration = mock(SearchConfiguration.class); + when(searchConfiguration.getBatchSize()).thenReturn(2); + when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration); + + final List firstPageSearchAfter = Collections.singletonList(UUID.randomUUID().toString()); + + final Event testEvent1 = mock(Event.class); + final Event testEvent2 = mock(Event.class); + final JsonNode testData1 = mock(JsonNode.class); + final JsonNode testData2 = mock(JsonNode.class); + when(testEvent1.getJsonNode()).thenReturn(testData1); + when(testEvent2.getJsonNode()).thenReturn(testData2); + when(objectMapper.writeValueAsBytes(testData1)).thenReturn(new byte[10]); + when(objectMapper.writeValueAsBytes(testData2)).thenReturn(new byte[20]); + + // First page returns a SHORT page (1 doc < batch_size=2) with a cursor — pagination must continue. + // Second page returns null nextSearchAfter to terminate. + when(searchAccessor.searchWithPit(any(SearchPointInTimeRequest.class))).thenAnswer(invocation -> { + final SearchPointInTimeRequest request = invocation.getArgument(0); + final SearchWithSearchAfterResults results = mock(SearchWithSearchAfterResults.class); + if (request.getSearchAfter() == null) { + when(results.getNextSearchAfter()).thenReturn(firstPageSearchAfter); + when(results.getDocuments()).thenReturn(List.of(testEvent1)); + return results; + } else { + when(results.getNextSearchAfter()).thenReturn(null); + when(results.getDocuments()).thenReturn(List.of(testEvent2)); + return results; + } + }); + + doNothing().when(bufferAccumulator).add(any(Record.class)); + doNothing().when(bufferAccumulator).flush(); + doNothing().when(searchAccessor).deletePit(any(DeletePointInTimeRequest.class)); + + when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)) + .thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty()); + + final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class); + when(schedulingParameterConfiguration.getIndexReadCount()).thenReturn(1); + when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO); + when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); + + doNothing().when(sourceCoordinator).closePartition(partitionKey, Duration.ZERO, 1, false); + + final Future future = executorService.submit(() -> createObjectUnderTest().run()); + await().atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> verify(indicesProcessedCounter).increment()); + + executorService.shutdown(); + future.cancel(true); + assertThat(executorService.awaitTermination(1, TimeUnit.SECONDS), equalTo(true)); + + // Key assertion: 2 search requests were made (past the short page). + // The old logic (size == batchSize) would have stopped after the short page. + verify(searchAccessor, times(2)).searchWithPit(any(SearchPointInTimeRequest.class)); + verify(documentsProcessedCounter, times(2)).increment(); + } + @Test void run_gives_up_partitions_and_waits_when_createPit_throws_SearchContextLimitException() throws Exception { mockTimerCallable(); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java index 09349579e4..1ab71eab2a 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java @@ -38,6 +38,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse; import java.time.Duration; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.UUID; @@ -186,8 +187,12 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_scro when(objectMapper.writeValueAsBytes(testData3)).thenReturn(new byte[30]); when(objectMapper.writeValueAsBytes(testData4)).thenReturn(new byte[40]); when(objectMapper.writeValueAsBytes(testData5)).thenReturn(new byte[50]); - when(searchScrollResponse.getDocuments()).thenReturn(List.of(testEvent3, testEvent4)) - .thenReturn(List.of(testEvent3, testEvent4)).thenReturn(List.of(testEvent5)).thenReturn(List.of(testEvent5)); + // The scroll worker calls getDocuments() twice per iteration (once to write to the buffer, + // once in shouldKeepScrolling). Duplicate each page's return to match that usage. + when(searchScrollResponse.getDocuments()) + .thenReturn(List.of(testEvent3, testEvent4)).thenReturn(List.of(testEvent3, testEvent4)) + .thenReturn(List.of(testEvent5)).thenReturn(List.of(testEvent5)) + .thenReturn(Collections.emptyList()).thenReturn(Collections.emptyList()); final ArgumentCaptor searchScrollRequestArgumentCaptor = ArgumentCaptor.forClass(SearchScrollRequest.class); when(searchAccessor.searchWithScroll(searchScrollRequestArgumentCaptor.capture())).thenReturn(searchScrollResponse); @@ -223,11 +228,11 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_scro assertThat(createScrollRequest.getIndex(), equalTo(partitionKey)); assertThat(createScrollRequest.getScrollTime(), equalTo(SCROLL_TIME_PER_BATCH)); - verify(searchAccessor, times(2)).searchWithScroll(any(SearchScrollRequest.class)); + verify(searchAccessor, times(3)).searchWithScroll(any(SearchScrollRequest.class)); verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), eq(null)); final List searchScrollRequests = searchScrollRequestArgumentCaptor.getAllValues(); - assertThat(searchScrollRequests.size(), equalTo(2)); + assertThat(searchScrollRequests.size(), equalTo(3)); assertThat(searchScrollRequests.get(0), notNullValue()); assertThat(searchScrollRequests.get(0).getScrollId(), equalTo(scrollId)); assertThat(searchScrollRequests.get(0).getScrollTime(), equalTo(SCROLL_TIME_PER_BATCH)); @@ -236,6 +241,10 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_scro assertThat(searchScrollRequests.get(1).getScrollId(), equalTo(scrollId)); assertThat(searchScrollRequests.get(1).getScrollTime(), equalTo(SCROLL_TIME_PER_BATCH)); + assertThat(searchScrollRequests.get(2), notNullValue()); + assertThat(searchScrollRequests.get(2).getScrollId(), equalTo(scrollId)); + assertThat(searchScrollRequests.get(2).getScrollTime(), equalTo(SCROLL_TIME_PER_BATCH)); + final DeleteScrollRequest deleteScrollRequest = deleteRequestArgumentCaptor.getValue(); assertThat(deleteScrollRequest, notNullValue()); @@ -311,8 +320,12 @@ void run_with_getNextPartition_with_acknowledgments_creates_and_deletes_scroll_a when(objectMapper.writeValueAsBytes(testData3)).thenReturn(new byte[30]); when(objectMapper.writeValueAsBytes(testData4)).thenReturn(new byte[40]); when(objectMapper.writeValueAsBytes(testData5)).thenReturn(new byte[50]); - when(searchScrollResponse.getDocuments()).thenReturn(List.of(testEvent3, testEvent4)) - .thenReturn(List.of(testEvent3, testEvent4)).thenReturn(List.of(testEvent5)).thenReturn(List.of(testEvent5)); + // The scroll worker calls getDocuments() twice per iteration (once to write to the buffer, + // once in shouldKeepScrolling). Duplicate each page's return to match that usage. + when(searchScrollResponse.getDocuments()) + .thenReturn(List.of(testEvent3, testEvent4)).thenReturn(List.of(testEvent3, testEvent4)) + .thenReturn(List.of(testEvent5)).thenReturn(List.of(testEvent5)) + .thenReturn(Collections.emptyList()).thenReturn(Collections.emptyList()); final ArgumentCaptor searchScrollRequestArgumentCaptor = ArgumentCaptor.forClass(SearchScrollRequest.class); when(searchAccessor.searchWithScroll(searchScrollRequestArgumentCaptor.capture())).thenReturn(searchScrollResponse); @@ -348,11 +361,11 @@ void run_with_getNextPartition_with_acknowledgments_creates_and_deletes_scroll_a assertThat(createScrollRequest.getIndex(), equalTo(partitionKey)); assertThat(createScrollRequest.getScrollTime(), equalTo(SCROLL_TIME_PER_BATCH)); - verify(searchAccessor, times(2)).searchWithScroll(any(SearchScrollRequest.class)); + verify(searchAccessor, times(3)).searchWithScroll(any(SearchScrollRequest.class)); verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), eq(null)); final List searchScrollRequests = searchScrollRequestArgumentCaptor.getAllValues(); - assertThat(searchScrollRequests.size(), equalTo(2)); + assertThat(searchScrollRequests.size(), equalTo(3)); assertThat(searchScrollRequests.get(0), notNullValue()); assertThat(searchScrollRequests.get(0).getScrollId(), equalTo(scrollId)); assertThat(searchScrollRequests.get(0).getScrollTime(), equalTo(SCROLL_TIME_PER_BATCH)); @@ -361,6 +374,10 @@ void run_with_getNextPartition_with_acknowledgments_creates_and_deletes_scroll_a assertThat(searchScrollRequests.get(1).getScrollId(), equalTo(scrollId)); assertThat(searchScrollRequests.get(1).getScrollTime(), equalTo(SCROLL_TIME_PER_BATCH)); + assertThat(searchScrollRequests.get(2), notNullValue()); + assertThat(searchScrollRequests.get(2).getScrollId(), equalTo(scrollId)); + assertThat(searchScrollRequests.get(2).getScrollTime(), equalTo(SCROLL_TIME_PER_BATCH)); + final DeleteScrollRequest deleteScrollRequest = deleteRequestArgumentCaptor.getValue(); assertThat(deleteScrollRequest, notNullValue()); @@ -383,6 +400,231 @@ void run_with_getNextPartition_with_acknowledgments_creates_and_deletes_scroll_a verifyNoInteractions(processingErrorsCounter); } + @Test + void run_continues_scrolling_past_short_page_and_terminates_on_empty_page() throws Exception { + mockTimerCallable(); + + final SourcePartition sourcePartition = mock(SourcePartition.class); + final String partitionKey = UUID.randomUUID().toString(); + when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); + when(sourcePartition.getPartitionState()).thenReturn(Optional.empty()); + + final String scrollId = UUID.randomUUID().toString(); + final CreateScrollResponse createScrollResponse = mock(CreateScrollResponse.class); + when(createScrollResponse.getScrollId()).thenReturn(scrollId); + + final Event testEvent1 = mock(Event.class); + final Event testEvent2 = mock(Event.class); + final Event testEvent3 = mock(Event.class); + final JsonNode testData1 = mock(JsonNode.class); + final JsonNode testData2 = mock(JsonNode.class); + final JsonNode testData3 = mock(JsonNode.class); + when(testEvent1.getJsonNode()).thenReturn(testData1); + when(testEvent2.getJsonNode()).thenReturn(testData2); + when(testEvent3.getJsonNode()).thenReturn(testData3); + when(objectMapper.writeValueAsBytes(testData1)).thenReturn(new byte[10]); + when(objectMapper.writeValueAsBytes(testData2)).thenReturn(new byte[20]); + when(objectMapper.writeValueAsBytes(testData3)).thenReturn(new byte[30]); + + // Initial scroll returns a full page (2 docs, batch_size=2) + when(createScrollResponse.getDocuments()).thenReturn(List.of(testEvent1, testEvent2)); + when(searchAccessor.createScroll(any(CreateScrollRequest.class))).thenReturn(createScrollResponse); + + final SearchConfiguration searchConfiguration = mock(SearchConfiguration.class); + when(searchConfiguration.getBatchSize()).thenReturn(2); + when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration); + + // Subsequent scroll pages: first returns a SHORT page (1 doc < batch_size), + // which must NOT terminate scrolling. The next page returns empty to terminate. + final SearchScrollResponse shortPageResponse = mock(SearchScrollResponse.class); + when(shortPageResponse.getScrollId()).thenReturn(scrollId); + when(shortPageResponse.getDocuments()).thenReturn(List.of(testEvent3)); + + final SearchScrollResponse emptyPageResponse = mock(SearchScrollResponse.class); + when(emptyPageResponse.getDocuments()).thenReturn(Collections.emptyList()); + + when(searchAccessor.searchWithScroll(any(SearchScrollRequest.class))) + .thenReturn(shortPageResponse) + .thenReturn(emptyPageResponse); + + doNothing().when(bufferAccumulator).add(any(Record.class)); + doNothing().when(bufferAccumulator).flush(); + doNothing().when(searchAccessor).deleteScroll(any(DeleteScrollRequest.class)); + + when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)) + .thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty()); + + final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class); + when(schedulingParameterConfiguration.getIndexReadCount()).thenReturn(1); + when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO); + when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); + + doNothing().when(sourceCoordinator).closePartition(partitionKey, Duration.ZERO, 1, false); + + final Future future = executorService.submit(() -> createObjectUnderTest().run()); + Thread.sleep(100); + executorService.shutdown(); + future.cancel(true); + assertThat(future.isCancelled(), equalTo(true)); + assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true)); + + // Key assertion: 2 scroll requests were made (past the short page), not just 1. + // The old logic (size == batchSize) would have stopped after the short page. + verify(searchAccessor, times(2)).searchWithScroll(any(SearchScrollRequest.class)); + verify(documentsProcessedCounter, times(3)).increment(); + verify(indicesProcessedCounter).increment(); + } + + @Test + void run_retries_scroll_failures_and_succeeds_when_fewer_than_max_consecutive() throws Exception { + mockTimerCallable(); + + final SourcePartition sourcePartition = mock(SourcePartition.class); + final String partitionKey = UUID.randomUUID().toString(); + when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); + when(sourcePartition.getPartitionState()).thenReturn(Optional.empty()); + + final String scrollId = UUID.randomUUID().toString(); + final CreateScrollResponse createScrollResponse = mock(CreateScrollResponse.class); + when(createScrollResponse.getScrollId()).thenReturn(scrollId); + final Event testEvent = mock(Event.class); + final JsonNode testData = mock(JsonNode.class); + when(testEvent.getJsonNode()).thenReturn(testData); + when(objectMapper.writeValueAsBytes(testData)).thenReturn(new byte[10]); + when(createScrollResponse.getDocuments()).thenReturn(List.of(testEvent)); + when(searchAccessor.createScroll(any(CreateScrollRequest.class))).thenReturn(createScrollResponse); + + final SearchConfiguration searchConfiguration = mock(SearchConfiguration.class); + when(searchConfiguration.getBatchSize()).thenReturn(2); + when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration); + + final Counter searchRequestsFailedCounter = mock(Counter.class); + when(openSearchSourcePluginMetrics.getSearchRequestsFailedCounter()).thenReturn(searchRequestsFailedCounter); + + // First scroll call fails, second succeeds with empty page (terminates) + final SearchScrollResponse emptyResponse = mock(SearchScrollResponse.class); + when(emptyResponse.getDocuments()).thenReturn(Collections.emptyList()); + when(searchAccessor.searchWithScroll(any(SearchScrollRequest.class))) + .thenThrow(new RuntimeException("transient failure")) + .thenReturn(emptyResponse); + + doNothing().when(bufferAccumulator).add(any(Record.class)); + doNothing().when(bufferAccumulator).flush(); + doNothing().when(searchAccessor).deleteScroll(any(DeleteScrollRequest.class)); + + when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)) + .thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty()); + + final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class); + when(schedulingParameterConfiguration.getIndexReadCount()).thenReturn(1); + when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO); + when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); + doNothing().when(sourceCoordinator).closePartition(partitionKey, Duration.ZERO, 1, false); + + final Future future = executorService.submit(() -> createObjectUnderTest().run()); + Thread.sleep(100); + executorService.shutdown(); + future.cancel(true); + assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true)); + + verify(searchRequestsFailedCounter).increment(); + verify(searchAccessor, times(2)).searchWithScroll(any(SearchScrollRequest.class)); + verify(indicesProcessedCounter).increment(); + } + + @Test + void run_gives_up_after_max_consecutive_scroll_failures() throws Exception { + mockTimerCallable(); + + final SourcePartition sourcePartition = mock(SourcePartition.class); + final String partitionKey = UUID.randomUUID().toString(); + when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); + when(sourcePartition.getPartitionState()).thenReturn(Optional.empty()); + + final String scrollId = UUID.randomUUID().toString(); + final CreateScrollResponse createScrollResponse = mock(CreateScrollResponse.class); + when(createScrollResponse.getScrollId()).thenReturn(scrollId); + final Event testEvent = mock(Event.class); + final JsonNode testData = mock(JsonNode.class); + when(testEvent.getJsonNode()).thenReturn(testData); + when(objectMapper.writeValueAsBytes(testData)).thenReturn(new byte[10]); + when(createScrollResponse.getDocuments()).thenReturn(List.of(testEvent)); + when(searchAccessor.createScroll(any(CreateScrollRequest.class))).thenReturn(createScrollResponse); + + final SearchConfiguration searchConfiguration = mock(SearchConfiguration.class); + when(searchConfiguration.getBatchSize()).thenReturn(2); + when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration); + + final Counter searchRequestsFailedCounter = mock(Counter.class); + when(openSearchSourcePluginMetrics.getSearchRequestsFailedCounter()).thenReturn(searchRequestsFailedCounter); + + // All scroll calls fail — exceeds MAX_CONSECUTIVE_SCROLL_FAILURES + when(searchAccessor.searchWithScroll(any(SearchScrollRequest.class))) + .thenThrow(new RuntimeException("persistent failure")); + + doNothing().when(bufferAccumulator).add(any(Record.class)); + doNothing().when(searchAccessor).deleteScroll(any(DeleteScrollRequest.class)); + + when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)) + .thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty()); + + final Future future = executorService.submit(() -> createObjectUnderTest().run()); + Thread.sleep(100); + executorService.shutdown(); + future.cancel(true); + assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true)); + + verify(searchRequestsFailedCounter, times(3)).increment(); + verify(searchAccessor, times(3)).searchWithScroll(any(SearchScrollRequest.class)); + verify(searchAccessor).deleteScroll(any(DeleteScrollRequest.class)); + verify(sourceCoordinator).giveUpPartition(partitionKey); + verify(processingErrorsCounter).increment(); + } + + @Test + void run_immediately_aborts_scroll_on_SearchContextLimitException_without_retry() throws Exception { + mockTimerCallable(); + + final SourcePartition sourcePartition = mock(SourcePartition.class); + final String partitionKey = UUID.randomUUID().toString(); + when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); + when(sourcePartition.getPartitionState()).thenReturn(Optional.empty()); + + final String scrollId = UUID.randomUUID().toString(); + final CreateScrollResponse createScrollResponse = mock(CreateScrollResponse.class); + when(createScrollResponse.getScrollId()).thenReturn(scrollId); + final Event testEvent = mock(Event.class); + final JsonNode testData = mock(JsonNode.class); + when(testEvent.getJsonNode()).thenReturn(testData); + when(objectMapper.writeValueAsBytes(testData)).thenReturn(new byte[10]); + when(createScrollResponse.getDocuments()).thenReturn(List.of(testEvent)); + when(searchAccessor.createScroll(any(CreateScrollRequest.class))).thenReturn(createScrollResponse); + + final SearchConfiguration searchConfiguration = mock(SearchConfiguration.class); + when(searchConfiguration.getBatchSize()).thenReturn(2); + when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration); + + when(searchAccessor.searchWithScroll(any(SearchScrollRequest.class))) + .thenThrow(new SearchContextLimitException("limit exceeded")); + + doNothing().when(bufferAccumulator).add(any(Record.class)); + doNothing().when(searchAccessor).deleteScroll(any(DeleteScrollRequest.class)); + + when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)) + .thenReturn(Optional.of(sourcePartition)); + + final Future future = executorService.submit(() -> createObjectUnderTest().run()); + Thread.sleep(100); + executorService.shutdown(); + future.cancel(true); + assertThat(executorService.awaitTermination(100, TimeUnit.MILLISECONDS), equalTo(true)); + + // Only 1 scroll attempt — no retries for SearchContextLimitException + verify(searchAccessor, times(1)).searchWithScroll(any(SearchScrollRequest.class)); + verify(searchAccessor).deleteScroll(any(DeleteScrollRequest.class)); + verify(sourceCoordinator).giveUpPartition(partitionKey); + } + @Test void run_gives_up_partitions_and_waits_when_createScroll_throws_SearchContextLimitException() throws Exception { mockTimerCallable(); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtilsCompletionTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtilsCompletionTest.java index fa70f99fdd..f54a54c4c0 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtilsCompletionTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtilsCompletionTest.java @@ -23,6 +23,9 @@ import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SchedulingParameterConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; + +import io.micrometer.core.instrument.Counter; import java.time.Duration; import java.util.UUID; @@ -31,6 +34,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -56,6 +60,8 @@ public class WorkerCommonUtilsCompletionTest { @Mock private AcknowledgementSet acknowledgementSet; + private OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; + private String partitionKey; @BeforeEach @@ -64,6 +70,8 @@ void setup() { lenient().when(indexPartition.getPartitionKey()).thenReturn(partitionKey); lenient().when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()) .thenReturn(schedulingParameterConfiguration); + openSearchSourcePluginMetrics = mock(OpenSearchSourcePluginMetrics.class); + lenient().when(openSearchSourcePluginMetrics.getIndicesCompletedWithFailuresCounter()).thenReturn(mock(Counter.class)); } @Test @@ -74,7 +82,7 @@ void completeIndexPartition_in_periodic_mode_without_acknowledgments_calls_close when(schedulingParameterConfiguration.getIndexReadCount()).thenReturn(1); WorkerCommonUtils.completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet, - indexPartition, sourceCoordinator); + indexPartition, sourceCoordinator, openSearchSourcePluginMetrics); verify(sourceCoordinator).closePartition(eq(partitionKey), eq(Duration.ofHours(8)), eq(1), eq(false)); verify(sourceCoordinator, never()).completePartition(eq(partitionKey), eq(false)); @@ -86,7 +94,7 @@ void completeIndexPartition_in_single_scan_mode_without_acknowledgments_calls_co when(openSearchSourceConfiguration.isSingleScanMode()).thenReturn(true); WorkerCommonUtils.completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet, - indexPartition, sourceCoordinator); + indexPartition, sourceCoordinator, openSearchSourcePluginMetrics); verify(sourceCoordinator).completePartition(eq(partitionKey), eq(false)); verify(sourceCoordinator, never()).closePartition(eq(partitionKey), any(Duration.class), any(Integer.class), any(Boolean.class)); @@ -97,7 +105,7 @@ void completeIndexPartition_with_acknowledgments_delegates_to_ack_path_regardles when(openSearchSourceConfiguration.isAcknowledgmentsEnabled()).thenReturn(true); WorkerCommonUtils.completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet, - indexPartition, sourceCoordinator); + indexPartition, sourceCoordinator, openSearchSourcePluginMetrics); verify(sourceCoordinator).updatePartitionForAcknowledgmentWait(eq(partitionKey), any(Duration.class)); verify(acknowledgementSet).complete(); @@ -163,4 +171,61 @@ void createAcknowledgmentSet_on_failed_ack_gives_up_partition_regardless_of_mode verify(sourceCoordinator, never()).completePartition(eq(partitionKey), any(Boolean.class)); verify(sourceCoordinator, never()).closePartition(eq(partitionKey), any(Duration.class), any(Integer.class), any(Boolean.class)); } + + @Test + void completeIndexPartition_with_failures_recorded_in_progress_state_increments_completed_with_failures_counter() { + when(openSearchSourceConfiguration.isAcknowledgmentsEnabled()).thenReturn(false); + when(openSearchSourceConfiguration.isSingleScanMode()).thenReturn(true); + + final OpenSearchIndexProgressState progressState = new OpenSearchIndexProgressState(); + progressState.setHadSearchFailures(true); + progressState.recordRequestFailure(new RuntimeException("boom")); + when(indexPartition.getPartitionState()).thenReturn(java.util.Optional.of(progressState)); + + final Counter indicesCompletedWithFailuresCounter = mock(Counter.class); + when(openSearchSourcePluginMetrics.getIndicesCompletedWithFailuresCounter()) + .thenReturn(indicesCompletedWithFailuresCounter); + + WorkerCommonUtils.completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet, + indexPartition, sourceCoordinator, openSearchSourcePluginMetrics); + + verify(indicesCompletedWithFailuresCounter).increment(); + verify(sourceCoordinator).completePartition(eq(partitionKey), eq(false)); + } + + @Test + void completeIndexPartition_without_failures_does_not_increment_completed_with_failures_counter() { + when(openSearchSourceConfiguration.isAcknowledgmentsEnabled()).thenReturn(false); + when(openSearchSourceConfiguration.isSingleScanMode()).thenReturn(true); + + final OpenSearchIndexProgressState progressState = new OpenSearchIndexProgressState(); + when(indexPartition.getPartitionState()).thenReturn(java.util.Optional.of(progressState)); + + final Counter indicesCompletedWithFailuresCounter = mock(Counter.class); + lenient().when(openSearchSourcePluginMetrics.getIndicesCompletedWithFailuresCounter()) + .thenReturn(indicesCompletedWithFailuresCounter); + + WorkerCommonUtils.completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet, + indexPartition, sourceCoordinator, openSearchSourcePluginMetrics); + + verify(indicesCompletedWithFailuresCounter, never()).increment(); + verify(sourceCoordinator).completePartition(eq(partitionKey), eq(false)); + } + + @Test + void completeIndexPartition_without_progress_state_does_not_attempt_failure_summary() { + when(openSearchSourceConfiguration.isAcknowledgmentsEnabled()).thenReturn(false); + when(openSearchSourceConfiguration.isSingleScanMode()).thenReturn(true); + + when(indexPartition.getPartitionState()).thenReturn(java.util.Optional.empty()); + + final Counter indicesCompletedWithFailuresCounter = mock(Counter.class); + lenient().when(openSearchSourcePluginMetrics.getIndicesCompletedWithFailuresCounter()) + .thenReturn(indicesCompletedWithFailuresCounter); + + WorkerCommonUtils.completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet, + indexPartition, sourceCoordinator, openSearchSourcePluginMetrics); + + verify(indicesCompletedWithFailuresCounter, never()).increment(); + } } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtilsTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtilsTest.java index b6452e34e9..d20ed82a21 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtilsTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtilsTest.java @@ -5,14 +5,31 @@ package org.opensearch.dataprepper.plugins.source.opensearch.worker; +import io.micrometer.core.instrument.Counter; import org.hamcrest.Matchers; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchShardStatistics; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.stream.Stream; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.MAX_BACKOFF; public class WorkerCommonUtilsTest { @@ -39,4 +56,139 @@ private static Stream retryCountToExpectedBackoffRange() { ); } + @Test + void hasMorePages_returns_false_when_results_is_null() { + assertThat(WorkerCommonUtils.hasMorePages(null), equalTo(false)); + } + + @Test + void hasMorePages_returns_false_when_nextSearchAfter_is_null() { + final SearchWithSearchAfterResults results = SearchWithSearchAfterResults.builder() + .withDocuments(List.of(mock(Event.class))) + .withNextSearchAfter(null) + .build(); + + assertThat(WorkerCommonUtils.hasMorePages(results), equalTo(false)); + } + + @Test + void hasMorePages_returns_false_when_documents_list_is_empty() { + final SearchWithSearchAfterResults results = SearchWithSearchAfterResults.builder() + .withDocuments(Collections.emptyList()) + .withNextSearchAfter(List.of("cursor")) + .build(); + + assertThat(WorkerCommonUtils.hasMorePages(results), equalTo(false)); + } + + @Test + void hasMorePages_returns_false_when_documents_is_null() { + final SearchWithSearchAfterResults results = SearchWithSearchAfterResults.builder() + .withDocuments(null) + .withNextSearchAfter(List.of("cursor")) + .build(); + + assertThat(WorkerCommonUtils.hasMorePages(results), equalTo(false)); + } + + @Test + void hasMorePages_returns_true_when_documents_present_and_nextSearchAfter_present() { + final SearchWithSearchAfterResults results = SearchWithSearchAfterResults.builder() + .withDocuments(List.of(mock(Event.class))) + .withNextSearchAfter(List.of("cursor")) + .build(); + + assertThat(WorkerCommonUtils.hasMorePages(results), equalTo(true)); + } + + @Test + void hasMorePages_returns_true_for_short_page_when_nextSearchAfter_present() { + final SearchWithSearchAfterResults results = SearchWithSearchAfterResults.builder() + .withDocuments(List.of(mock(Event.class))) + .withNextSearchAfter(List.of("cursor")) + .build(); + + assertThat(WorkerCommonUtils.hasMorePages(results), equalTo(true)); + } + + @Test + void recordShardFailuresIfAny_is_no_op_when_stats_is_null() { + final OpenSearchIndexProgressState progressState = new OpenSearchIndexProgressState(); + final OpenSearchSourcePluginMetrics metrics = mock(OpenSearchSourcePluginMetrics.class); + + WorkerCommonUtils.recordShardFailuresIfAny("test-index", null, progressState, metrics); + + assertThat(progressState.isHadSearchFailures(), is(false)); + } + + @Test + void recordShardFailuresIfAny_is_no_op_when_stats_has_no_failures() { + final OpenSearchIndexProgressState progressState = new OpenSearchIndexProgressState(); + final OpenSearchSourcePluginMetrics metrics = mock(OpenSearchSourcePluginMetrics.class); + + WorkerCommonUtils.recordShardFailuresIfAny("test-index", SearchShardStatistics.empty(), progressState, metrics); + + assertThat(progressState.isHadSearchFailures(), is(false)); + } + + @Test + void recordShardFailuresIfAny_increments_counter_and_records_to_progress_state() { + final OpenSearchIndexProgressState progressState = new OpenSearchIndexProgressState(); + final OpenSearchSourcePluginMetrics metrics = mock(OpenSearchSourcePluginMetrics.class); + final Counter shardsFailedCounter = mock(Counter.class); + when(metrics.getSearchShardsFailedCounter()).thenReturn(shardsFailedCounter); + + final Map reasons = new LinkedHashMap<>(); + reasons.put("shard_failure: timeout", 3L); + final SearchShardStatistics stats = new SearchShardStatistics(5, 2, 3, 0, reasons); + + WorkerCommonUtils.recordShardFailuresIfAny("test-index", stats, progressState, metrics); + + verify(shardsFailedCounter).increment(3); + assertThat(progressState.isHadSearchFailures(), is(true)); + assertThat(progressState.getFailureReasonCounts().get("shard_failure: timeout"), equalTo(3L)); + } + + @Test + void recordShardFailuresIfAny_handles_null_metrics_gracefully() { + final OpenSearchIndexProgressState progressState = new OpenSearchIndexProgressState(); + + final Map reasons = new LinkedHashMap<>(); + reasons.put("reason", 1L); + final SearchShardStatistics stats = new SearchShardStatistics(5, 4, 1, 0, reasons); + + WorkerCommonUtils.recordShardFailuresIfAny("test-index", stats, progressState, null); + + assertThat(progressState.isHadSearchFailures(), is(true)); + } + + @Test + void recordShardFailuresIfAny_handles_null_progress_state_gracefully() { + final OpenSearchSourcePluginMetrics metrics = mock(OpenSearchSourcePluginMetrics.class); + final Counter shardsFailedCounter = mock(Counter.class); + when(metrics.getSearchShardsFailedCounter()).thenReturn(shardsFailedCounter); + + final Map reasons = new LinkedHashMap<>(); + reasons.put("reason", 1L); + final SearchShardStatistics stats = new SearchShardStatistics(5, 4, 1, 0, reasons); + + WorkerCommonUtils.recordShardFailuresIfAny("test-index", stats, null, metrics); + + verify(shardsFailedCounter).increment(1); + } + + @Test + void recordShardFailuresIfAny_does_not_increment_counter_when_failed_is_zero_but_reasons_present() { + final OpenSearchSourcePluginMetrics metrics = mock(OpenSearchSourcePluginMetrics.class); + final Counter shardsFailedCounter = mock(Counter.class); + when(metrics.getSearchShardsFailedCounter()).thenReturn(shardsFailedCounter); + + final Map reasons = new LinkedHashMap<>(); + reasons.put("reason", 1L); + final SearchShardStatistics stats = new SearchShardStatistics(5, 5, 0, 0, reasons); + + WorkerCommonUtils.recordShardFailuresIfAny("test-index", stats, new OpenSearchIndexProgressState(), metrics); + + verify(shardsFailedCounter, never()).increment(0); + } } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchShardStatisticsTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchShardStatisticsTest.java new file mode 100644 index 0000000000..4d598e492e --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchShardStatisticsTest.java @@ -0,0 +1,307 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class SearchShardStatisticsTest { + + @Test + void empty_returns_the_same_shared_instance_with_zero_counts() { + final SearchShardStatistics empty = SearchShardStatistics.empty(); + + assertThat(empty, sameInstance(SearchShardStatistics.empty())); + assertThat(empty.getTotal(), equalTo(0)); + assertThat(empty.getSuccessful(), equalTo(0)); + assertThat(empty.getFailed(), equalTo(0)); + assertThat(empty.getSkipped(), equalTo(0)); + assertThat(empty.getFailureReasonCounts().isEmpty(), is(true)); + assertThat(empty.hasFailures(), is(false)); + } + + @Test + void constructor_with_null_map_produces_empty_counts() { + final SearchShardStatistics stats = new SearchShardStatistics(5, 4, 1, 0, null); + + assertThat(stats.getFailureReasonCounts(), equalTo(Collections.emptyMap())); + assertThat(stats.hasFailures(), is(true)); + } + + @Test + void hasFailures_is_true_when_failed_is_zero_but_reasons_are_present() { + final Map counts = new LinkedHashMap<>(); + counts.put("ConnectTimeout: timed out", 1L); + final SearchShardStatistics stats = new SearchShardStatistics(5, 5, 0, 0, counts); + + assertThat(stats.hasFailures(), is(true)); + } + + @Test + void getFailureReasonCounts_returns_unmodifiable_view() { + final Map counts = new LinkedHashMap<>(); + counts.put("reason", 1L); + final SearchShardStatistics stats = new SearchShardStatistics(1, 0, 1, 0, counts); + + assertThrows(UnsupportedOperationException.class, + () -> stats.getFailureReasonCounts().put("other", 1L)); + } + + @Test + void normalizeReason_single_arg_returns_unknown_for_null_or_empty() { + assertThat(SearchShardStatistics.normalizeReason((String) null), equalTo("unknown")); + assertThat(SearchShardStatistics.normalizeReason(""), equalTo("unknown")); + assertThat(SearchShardStatistics.normalizeReason(" "), equalTo("unknown")); + } + + @Test + void normalizeReason_strips_shard_ids_node_ids_and_uuids() { + final String raw = "shard failure on [my-index][3] at node[data-1a2b3c4d] with trace " + + "123e4567-e89b-12d3-a456-426614174000 failed"; + + final String normalized = SearchShardStatistics.normalizeReason(raw); + + assertThat(normalized, containsString("shard failure on [shard] at node[?] with trace failed")); + assertThat(normalized, not(containsString("[my-index][3]"))); + assertThat(normalized, not(containsString("data-1a2b3c4d"))); + assertThat(normalized, not(containsString("123e4567"))); + } + + @Test + void normalizeReason_strips_ip_addresses_and_ports() { + final String raw = "connection refused to 10.0.1.5:9300 from node 192.168.1.100"; + + final String normalized = SearchShardStatistics.normalizeReason(raw); + + assertThat(normalized, equalTo("connection refused to from node ")); + assertThat(normalized, not(containsString("10.0.1.5"))); + assertThat(normalized, not(containsString("192.168.1.100"))); + } + + @Test + void normalizeReason_two_arg_joins_type_and_message() { + final String normalized = SearchShardStatistics.normalizeReason("shard_failure", "timed out"); + + assertThat(normalized, equalTo("shard_failure: timed out")); + } + + @Test + void normalizeReason_two_arg_handles_null_components() { + assertThat(SearchShardStatistics.normalizeReason(null, null), equalTo("unknown")); + assertThat(SearchShardStatistics.normalizeReason("type_only", null), equalTo("type_only")); + assertThat(SearchShardStatistics.normalizeReason(null, "message_only"), equalTo("message_only")); + } + + @Test + void incrementFailureReasonCount_adds_new_key_and_sums_existing() { + final Map counts = new LinkedHashMap<>(); + + SearchShardStatistics.incrementFailureReasonCount(counts, "timeout", 1L); + SearchShardStatistics.incrementFailureReasonCount(counts, "timeout", 2L); + SearchShardStatistics.incrementFailureReasonCount(counts, "connect_refused", 5L); + + assertThat(counts.get("timeout"), equalTo(3L)); + assertThat(counts.get("connect_refused"), equalTo(5L)); + assertThat(counts.size(), equalTo(2)); + } + + @Test + void incrementFailureReasonCount_ignores_null_reason_and_non_positive_delta() { + final Map counts = new LinkedHashMap<>(); + + SearchShardStatistics.incrementFailureReasonCount(counts, null, 3L); + SearchShardStatistics.incrementFailureReasonCount(counts, "reason", 0L); + SearchShardStatistics.incrementFailureReasonCount(counts, "reason", -1L); + + assertThat(counts.isEmpty(), is(true)); + } + + @Test + void incrementFailureReasonCount_rejects_null_counts_map() { + assertThrows(NullPointerException.class, + () -> SearchShardStatistics.incrementFailureReasonCount(null, "reason", 1L)); + } + + @Test + void incrementFailureReasonCount_folds_overflow_into_other_bucket_once_cap_is_reached() { + final Map counts = new LinkedHashMap<>(); + + for (int i = 0; i < SearchShardStatistics.MAX_DISTINCT_REASONS; i++) { + SearchShardStatistics.incrementFailureReasonCount(counts, "reason-" + i, 1L); + } + // these all exceed the cap and should fold into the overflow bucket + SearchShardStatistics.incrementFailureReasonCount(counts, "overflow-a", 2L); + SearchShardStatistics.incrementFailureReasonCount(counts, "overflow-b", 3L); + SearchShardStatistics.incrementFailureReasonCount(counts, "overflow-c", 4L); + + assertThat(counts.size(), equalTo(SearchShardStatistics.MAX_DISTINCT_REASONS + 1)); + assertThat(counts.get(SearchShardStatistics.OVERFLOW_REASON_KEY), equalTo(9L)); + assertThat(counts.get("overflow-a"), nullValue()); + } + + @Test + void incrementFailureReasonCount_still_increments_existing_keys_after_cap_is_reached() { + final Map counts = new LinkedHashMap<>(); + for (int i = 0; i < SearchShardStatistics.MAX_DISTINCT_REASONS; i++) { + SearchShardStatistics.incrementFailureReasonCount(counts, "reason-" + i, 1L); + } + + // already-known key still increments even after cap is hit + SearchShardStatistics.incrementFailureReasonCount(counts, "reason-0", 10L); + SearchShardStatistics.incrementFailureReasonCount(counts, "new-key", 1L); + + assertThat(counts.get("reason-0"), equalTo(11L)); + assertThat(counts.get(SearchShardStatistics.OVERFLOW_REASON_KEY), equalTo(1L)); + } + + @Test + void mergeFailureReasonCounts_merges_maps_and_honors_cap() { + final Map destination = new LinkedHashMap<>(); + SearchShardStatistics.incrementFailureReasonCount(destination, "shared", 2L); + + final Map source = new LinkedHashMap<>(); + source.put("shared", 3L); + source.put("new", 5L); + + SearchShardStatistics.mergeFailureReasonCounts(destination, source); + + assertThat(destination.get("shared"), equalTo(5L)); + assertThat(destination.get("new"), equalTo(5L)); + } + + @Test + void mergeFailureReasonCounts_is_a_no_op_for_null_or_empty_source() { + final Map counts = new LinkedHashMap<>(); + counts.put("reason", 1L); + + SearchShardStatistics.mergeFailureReasonCounts(counts, null); + SearchShardStatistics.mergeFailureReasonCounts(counts, Collections.emptyMap()); + + assertThat(counts.size(), equalTo(1)); + assertThat(counts.get("reason"), equalTo(1L)); + } + + @Test + void equals_and_hashCode_compare_all_fields() { + final Map counts = new LinkedHashMap<>(); + counts.put("a", 1L); + + final SearchShardStatistics a = new SearchShardStatistics(5, 4, 1, 0, counts); + final SearchShardStatistics b = new SearchShardStatistics(5, 4, 1, 0, counts); + final SearchShardStatistics c = new SearchShardStatistics(5, 4, 1, 0, Collections.emptyMap()); + + assertThat(a, equalTo(b)); + assertThat(a.hashCode(), equalTo(b.hashCode())); + assertThat(a.equals(c), is(false)); + } + + @Test + void numberOrZero_returns_zero_for_null() { + assertThat(SearchShardStatistics.numberOrZero(null), equalTo(0)); + } + + @Test + void numberOrZero_returns_int_value_for_non_null() { + assertThat(SearchShardStatistics.numberOrZero(5), equalTo(5)); + assertThat(SearchShardStatistics.numberOrZero(3L), equalTo(3)); + assertThat(SearchShardStatistics.numberOrZero(7.9), equalTo(7)); + } + + @Test + void fromShardCounts_with_null_failures_produces_empty_reason_map() { + final SearchShardStatistics stats = SearchShardStatistics.fromShardCounts(10, 8, 2, 0, null); + + assertThat(stats.getTotal(), equalTo(10)); + assertThat(stats.getSuccessful(), equalTo(8)); + assertThat(stats.getFailed(), equalTo(2)); + assertThat(stats.getSkipped(), equalTo(0)); + assertThat(stats.getFailureReasonCounts().isEmpty(), is(true)); + } + + @Test + void fromShardCounts_with_empty_failures_list_produces_empty_reason_map() { + final SearchShardStatistics stats = SearchShardStatistics.fromShardCounts(5, 5, 0, 0, Collections.emptyList()); + + assertThat(stats.getFailureReasonCounts().isEmpty(), is(true)); + assertThat(stats.hasFailures(), is(false)); + } + + @Test + void fromShardCounts_with_null_shard_counts_defaults_to_zero() { + final SearchShardStatistics stats = SearchShardStatistics.fromShardCounts(null, null, null, null, null); + + assertThat(stats.getTotal(), equalTo(0)); + assertThat(stats.getSuccessful(), equalTo(0)); + assertThat(stats.getFailed(), equalTo(0)); + assertThat(stats.getSkipped(), equalTo(0)); + } + + @Test + void fromShardCounts_aggregates_failure_reasons_from_type_message_pairs() { + final List failures = Arrays.asList( + new String[]{"shard_failure", "timed out"}, + new String[]{"shard_failure", "timed out"}, + new String[]{"connect_exception", "connection refused"} + ); + + final SearchShardStatistics stats = SearchShardStatistics.fromShardCounts(5, 2, 3, 0, failures); + + assertThat(stats.getFailed(), equalTo(3)); + assertThat(stats.getFailureReasonCounts().get("shard_failure: timed out"), equalTo(2L)); + assertThat(stats.getFailureReasonCounts().get("connect_exception: connection refused"), equalTo(1L)); + assertThat(stats.getFailureReasonCounts().size(), equalTo(2)); + } + + @Test + void fromShardCounts_handles_null_entries_in_failures_list() { + final List failures = Arrays.asList( + null, + new String[]{}, + new String[]{null}, + new String[]{"type_only"} + ); + + final SearchShardStatistics stats = SearchShardStatistics.fromShardCounts(4, 0, 4, 0, failures); + + // null entry, empty array, and array with only null type all normalize to "unknown" + assertThat(stats.getFailureReasonCounts().containsKey("unknown"), is(true)); + assertThat(stats.getFailureReasonCounts().get("unknown"), equalTo(3L)); + assertThat(stats.getFailureReasonCounts().get("type_only"), equalTo(1L)); + } + + @Test + void fromShardCounts_respects_cap_when_many_distinct_failures() { + final List failures = new java.util.ArrayList<>(); + for (int i = 0; i < SearchShardStatistics.MAX_DISTINCT_REASONS + 5; i++) { + failures.add(new String[]{"type_" + i, "message_" + i}); + } + + final SearchShardStatistics stats = SearchShardStatistics.fromShardCounts(30, 5, 25, 0, failures); + + // 20 distinct keys + 1 overflow bucket + assertThat(stats.getFailureReasonCounts().size(), equalTo(SearchShardStatistics.MAX_DISTINCT_REASONS + 1)); + assertThat(stats.getFailureReasonCounts().get(SearchShardStatistics.OVERFLOW_REASON_KEY), equalTo(5L)); + } + +}