Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@
package org.opensearch.dataprepper.plugins.source.opensearch;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchShardStatistics;

import java.time.Instant;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class OpenSearchIndexProgressState {
Expand All @@ -27,6 +31,14 @@ public class OpenSearchIndexProgressState {
@JsonInclude(JsonInclude.Include.NON_NULL)
private List<String> searchAfter;

@JsonProperty("had_search_failures")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
private boolean hadSearchFailures;

@JsonProperty("failure_reason_counts")
@JsonInclude(JsonInclude.Include.NON_EMPTY)
private Map<String, Long> failureReasonCounts = new LinkedHashMap<>();

public OpenSearchIndexProgressState() {

}
Expand All @@ -35,11 +47,15 @@ public OpenSearchIndexProgressState() {
public OpenSearchIndexProgressState(@JsonProperty("pit_id") final String pitId,
@JsonProperty("pit_creation_time") final Long pitCreationTime,
@JsonProperty("pit_keep_alive") final Long pitKeepAlive,
@JsonProperty("pit_search_after") final List<String> searchAfter) {
@JsonProperty("pit_search_after") final List<String> searchAfter,
@JsonProperty("had_search_failures") final boolean hadSearchFailures,
@JsonProperty("failure_reason_counts") final Map<String, Long> failureReasonCounts) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that we have a @JsonProperty on the constructor, but not on the getters or fields. How does this work now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add on the field

this.pitId = pitId;
this.pitCreationTime = pitCreationTime;
this.keepAlive = pitKeepAlive;
this.searchAfter = searchAfter;
this.hadSearchFailures = hadSearchFailures;
this.failureReasonCounts = failureReasonCounts == null ? new LinkedHashMap<>() : new LinkedHashMap<>(failureReasonCounts);
}

public List<String> getSearchAfter() {
Expand Down Expand Up @@ -74,6 +90,49 @@ public void setKeepAlive(final Long keepAlive) {
this.keepAlive = keepAlive;
}

@JsonIgnore
private transient ShardFailureAggregator failureAggregator;

private ShardFailureAggregator getOrCreateAggregator() {
if (failureAggregator == null) {
failureAggregator = new ShardFailureAggregator(hadSearchFailures, failureReasonCounts);
}
return failureAggregator;
}

private void syncFromAggregator() {
hadSearchFailures = failureAggregator.hadFailures();
failureReasonCounts = new LinkedHashMap<>(failureAggregator.getFailureReasonCounts());
}

public boolean isHadSearchFailures() {
return getOrCreateAggregator().hadFailures();
}

public void setHadSearchFailures(final boolean hadSearchFailures) {
this.hadSearchFailures = hadSearchFailures;
this.failureAggregator = null;
}

public Map<String, Long> getFailureReasonCounts() {
return getOrCreateAggregator().getFailureReasonCounts();
}

public void setFailureReasonCounts(final Map<String, Long> failureReasonCounts) {
this.failureReasonCounts = failureReasonCounts == null ? new LinkedHashMap<>() : new LinkedHashMap<>(failureReasonCounts);
this.failureAggregator = null;
}

public void recordShardFailures(final SearchShardStatistics shardStatistics) {
getOrCreateAggregator().recordShardFailures(shardStatistics);
syncFromAggregator();
}

public void recordRequestFailure(final Throwable throwable) {
getOrCreateAggregator().recordRequestFailure(throwable);
syncFromAggregator();
}

public boolean hasValidPointInTime() {
return Objects.nonNull(pitId) && Objects.nonNull(pitCreationTime) && Objects.nonNull(keepAlive)
&& Instant.ofEpochMilli(pitCreationTime + keepAlive).isAfter(Instant.now());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.source.opensearch;

import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchShardStatistics;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;

class ShardFailureAggregator {

static final int MAX_DISTINCT_REASONS = 20;
static final String OVERFLOW_REASON_KEY = "__other__";

private boolean hadFailures;
private final Map<String, Long> failureReasonCounts;

ShardFailureAggregator() {
this.hadFailures = false;
this.failureReasonCounts = new LinkedHashMap<>();
}

ShardFailureAggregator(final boolean hadFailures, final Map<String, Long> counts) {
this.hadFailures = hadFailures;
this.failureReasonCounts = counts == null ? new LinkedHashMap<>() : new LinkedHashMap<>(counts);
}

void recordShardFailures(final SearchShardStatistics pageStats) {
if (pageStats == null || !pageStats.hasFailures()) {
return;
}
this.hadFailures = true;
mergeFailureReasonCounts(pageStats.getFailureReasonCounts());
}

void recordRequestFailure(final Throwable throwable) {
this.hadFailures = true;
final String key;
if (throwable == null) {
key = "unknown";
} else {
final String message = throwable.getMessage() == null ? "" : ": " + throwable.getMessage();
key = SearchShardStatistics.normalizeReason(throwable.getClass().getSimpleName() + message);
}
increment(key, 1L);
}

boolean hadFailures() {
return hadFailures;
}

Map<String, Long> getFailureReasonCounts() {
return Collections.unmodifiableMap(failureReasonCounts);
}

private void mergeFailureReasonCounts(final Map<String, Long> toMerge) {
if (toMerge == null || toMerge.isEmpty()) {
return;
}
for (final Map.Entry<String, Long> entry : toMerge.entrySet()) {
increment(entry.getKey(), entry.getValue() == null ? 0L : entry.getValue());
}
}

private void increment(final String key, final long delta) {
if (key == null || delta <= 0) {
return;
}
if (failureReasonCounts.containsKey(key)) {
failureReasonCounts.merge(key, delta, Long::sum);
return;
}
if (failureReasonCounts.size() < MAX_DISTINCT_REASONS) {
failureReasonCounts.put(key, delta);
return;
}
failureReasonCounts.merge(OVERFLOW_REASON_KEY, delta, Long::sum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ public class OpenSearchSourcePluginMetrics {
static final String BYTES_PROCESSED = "bytesProcessed";
static final String CREDENTIALS_CHANGED = "credentialsChanged";
static final String CLIENT_REFRESH_ERRORS = "clientRefreshErrors";
static final String SEARCH_REQUESTS_FAILED = "searchRequestsFailed";
static final String SEARCH_SHARDS_FAILED = "searchShardsFailed";
static final String INDICES_COMPLETED_WITH_FAILURES = "indicesCompletedWithFailures";

private final Counter documentsProcessedCounter;
private final Counter indicesProcessedCounter;
Expand All @@ -30,6 +33,9 @@ public class OpenSearchSourcePluginMetrics {
private final DistributionSummary bytesProcessedSummary;
private final Counter credentialsChangeCounter;
private final Counter clientRefreshErrorsCounter;
private final Counter searchRequestsFailedCounter;
private final Counter searchShardsFailedCounter;
private final Counter indicesCompletedWithFailuresCounter;

public static OpenSearchSourcePluginMetrics create(final PluginMetrics pluginMetrics) {
return new OpenSearchSourcePluginMetrics(pluginMetrics);
Expand All @@ -44,6 +50,9 @@ private OpenSearchSourcePluginMetrics(final PluginMetrics pluginMetrics) {
bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED);
credentialsChangeCounter = pluginMetrics.counter(CREDENTIALS_CHANGED);
clientRefreshErrorsCounter = pluginMetrics.counter(CLIENT_REFRESH_ERRORS);
searchRequestsFailedCounter = pluginMetrics.counter(SEARCH_REQUESTS_FAILED);
searchShardsFailedCounter = pluginMetrics.counter(SEARCH_SHARDS_FAILED);
indicesCompletedWithFailuresCounter = pluginMetrics.counter(INDICES_COMPLETED_WITH_FAILURES);
}

public Counter getDocumentsProcessedCounter() {
Expand Down Expand Up @@ -77,4 +86,16 @@ public Counter getCredentialsChangeCounter() {
public Counter getClientRefreshErrorsCounter() {
return clientRefreshErrorsCounter;
}

public Counter getSearchRequestsFailedCounter() {
return searchRequestsFailedCounter;
}

public Counter getSearchShardsFailedCounter() {
return searchShardsFailedCounter;
}

public Counter getIndicesCompletedWithFailuresCounter() {
return indicesCompletedWithFailuresCounter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.calculateExponentialBackoffAndJitter;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.completeIndexPartition;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.createAcknowledgmentSet;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.hasMorePages;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.recordShardFailuresIfAny;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_ID_METADATA_ATTRIBUTE_NAME;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.INDEX_METADATA_ATTRIBUTE_NAME;

Expand Down Expand Up @@ -103,7 +105,7 @@ public void run() {
openSearchSourcePluginMetrics.getIndexProcessingTimeTimer().record(() -> processIndex(indexPartition.get(), acknowledgementSet));

completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet,
indexPartition.get(), sourceCoordinator);
indexPartition.get(), sourceCoordinator, openSearchSourcePluginMetrics);

openSearchSourcePluginMetrics.getIndicesProcessedCounter().increment();
LOG.info("Completed processing for index: '{}'", indexPartition.get().getPartitionKey());
Expand Down Expand Up @@ -178,15 +180,19 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op

openSearchIndexProgressState.setSearchAfter(searchWithSearchAfterResults.getNextSearchAfter());

recordShardFailuresIfAny(indexName, searchWithSearchAfterResults.getShardStatistics(), openSearchIndexProgressState, openSearchSourcePluginMetrics);

if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) {
LOG.debug("Renew ownership of index {}", indexName);
sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState);
lastCheckpointTime = System.currentTimeMillis();
}
} while (searchWithSearchAfterResults.getDocuments().size() == searchConfiguration.getBatchSize());
} while (hasMorePages(searchWithSearchAfterResults));

LOG.info("Received {} documents in latest search request, and batch size is {}, exiting pagination",
searchWithSearchAfterResults.getDocuments().size(), searchConfiguration.getBatchSize());
LOG.info("Reached end of index '{}' (last page returned {} documents, nextSearchAfter present: {}).",
indexName,
searchWithSearchAfterResults.getDocuments().size(),
searchWithSearchAfterResults.getNextSearchAfter() != null);

try {
bufferAccumulator.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.calculateExponentialBackoffAndJitter;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.completeIndexPartition;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.createAcknowledgmentSet;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.hasMorePages;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.recordShardFailuresIfAny;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_ID_METADATA_ATTRIBUTE_NAME;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.INDEX_METADATA_ATTRIBUTE_NAME;

Expand Down Expand Up @@ -116,7 +118,7 @@ public void run() {
openSearchSourcePluginMetrics.getIndexProcessingTimeTimer().record(() -> processIndex(indexPartition.get(), acknowledgementSet));

completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet,
indexPartition.get(), sourceCoordinator);
indexPartition.get(), sourceCoordinator, openSearchSourcePluginMetrics);

openSearchSourcePluginMetrics.getIndicesProcessedCounter().increment();
} catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) {
Expand Down Expand Up @@ -216,15 +218,19 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
openSearchIndexProgressState.setSearchAfter(searchWithSearchAfterResults.getNextSearchAfter());
openSearchIndexProgressState.setKeepAlive(Duration.ofMillis(openSearchIndexProgressState.getKeepAlive()).plus(EXTEND_KEEP_ALIVE_DURATION).toMillis());

recordShardFailuresIfAny(indexName, searchWithSearchAfterResults.getShardStatistics(), openSearchIndexProgressState, openSearchSourcePluginMetrics);

if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) {
LOG.debug("Renew ownership of index {}", indexName);
sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState);
lastCheckpointTime = System.currentTimeMillis();
}
} while (searchWithSearchAfterResults.getDocuments().size() == searchConfiguration.getBatchSize());
} while (hasMorePages(searchWithSearchAfterResults));

LOG.info("Received {} documents in latest search request, and batch size is {}, exiting pagination",
searchWithSearchAfterResults.getDocuments().size(), searchConfiguration.getBatchSize());
LOG.info("Reached end of index '{}' (last page returned {} documents, nextSearchAfter present: {}).",
indexName,
searchWithSearchAfterResults.getDocuments().size(),
searchWithSearchAfterResults.getNextSearchAfter() != null);

try {
bufferAccumulator.flush();
Expand Down
Loading
Loading