Skip to content

Commit cdbc51e

Browse files
committed
Fix OpenSearch source pagination to handle failures correctly
Pagination previously terminated whenever a page returned fewer documents than the configured batch_size, which silently dropped the rest of an index whenever a request hit partial shard failures or a transient error. The correct termination signal is used instead: nextSearchAfter == null / empty page for search_after and PIT workers, and an empty page for the scroll worker. Shard failures are now captured in a bounded map of normalized reason -> count (capped at 20 distinct keys with an "__other__" overflow bucket), persisted on OpenSearchIndexProgressState, surfaced as new counters (searchShardsFailed, searchRequestsFailed, indicesCompletedWithFailures), and logged per page plus once at index completion. The scroll worker no longer aborts an index on a single per-request exception; it tolerates up to MAX_CONSECUTIVE_SCROLL_FAILURES retries before giving up the partition. Signed-off-by: Keyur-S-Patel <keyurpatel.opensource@gmail.com> Fixes #6337
1 parent 513dcfb commit cdbc51e

19 files changed

Lines changed: 1279 additions & 55 deletions

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ private void doInitializeInternal() throws IOException {
160160

161161
// Create index with semantic enrichment via AWS control plane (AOSS or managed domain) if configured.
162162
new SemanticEnrichmentIndexManager(awsCredentialsSupplier).maybeCreateIndex(
163-
connectionConfiguration,
163+
openSearchSinkConfig.getConnectionConfiguration(),
164164
openSearchSinkConfig.getIndexConfiguration().getSemanticEnrichmentConfig(),
165165
openSearchSinkConfig.getIndexConfiguration().getSemanticEnrichmentResourceName(),
166166
configuredIndexAlias);

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchIndexProgressState.java

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@
88
import com.fasterxml.jackson.annotation.JsonCreator;
99
import com.fasterxml.jackson.annotation.JsonInclude;
1010
import com.fasterxml.jackson.annotation.JsonProperty;
11+
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchShardStatistics;
1112

1213
import java.time.Instant;
14+
import java.util.LinkedHashMap;
1315
import java.util.List;
16+
import java.util.Map;
1417
import java.util.Objects;
1518

1619
public class OpenSearchIndexProgressState {
@@ -27,6 +30,12 @@ public class OpenSearchIndexProgressState {
2730
@JsonInclude(JsonInclude.Include.NON_NULL)
2831
private List<String> searchAfter;
2932

33+
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
34+
private boolean hadSearchFailures;
35+
36+
@JsonInclude(JsonInclude.Include.NON_EMPTY)
37+
private Map<String, Long> failureReasonCounts = new LinkedHashMap<>();
38+
3039
public OpenSearchIndexProgressState() {
3140

3241
}
@@ -35,11 +44,15 @@ public OpenSearchIndexProgressState() {
3544
public OpenSearchIndexProgressState(@JsonProperty("pit_id") final String pitId,
3645
@JsonProperty("pit_creation_time") final Long pitCreationTime,
3746
@JsonProperty("pit_keep_alive") final Long pitKeepAlive,
38-
@JsonProperty("pit_search_after") final List<String> searchAfter) {
47+
@JsonProperty("pit_search_after") final List<String> searchAfter,
48+
@JsonProperty("had_search_failures") final boolean hadSearchFailures,
49+
@JsonProperty("failure_reason_counts") final Map<String, Long> failureReasonCounts) {
3950
this.pitId = pitId;
4051
this.pitCreationTime = pitCreationTime;
4152
this.keepAlive = pitKeepAlive;
4253
this.searchAfter = searchAfter;
54+
this.hadSearchFailures = hadSearchFailures;
55+
this.failureReasonCounts = failureReasonCounts == null ? new LinkedHashMap<>() : new LinkedHashMap<>(failureReasonCounts);
4356
}
4457

4558
public List<String> getSearchAfter() {
@@ -74,6 +87,59 @@ public void setKeepAlive(final Long keepAlive) {
7487
this.keepAlive = keepAlive;
7588
}
7689

90+
public boolean isHadSearchFailures() {
91+
return hadSearchFailures;
92+
}
93+
94+
public void setHadSearchFailures(final boolean hadSearchFailures) {
95+
this.hadSearchFailures = hadSearchFailures;
96+
}
97+
98+
public Map<String, Long> getFailureReasonCounts() {
99+
return failureReasonCounts;
100+
}
101+
102+
public void setFailureReasonCounts(final Map<String, Long> failureReasonCounts) {
103+
this.failureReasonCounts = failureReasonCounts == null ? new LinkedHashMap<>() : new LinkedHashMap<>(failureReasonCounts);
104+
}
105+
106+
/**
107+
* Record shard-level failures observed on a page. Sets the had-failures flag
108+
* and merges the per-response aggregated reason counts into the persisted
109+
* progress state, respecting the {@link SearchShardStatistics#MAX_DISTINCT_REASONS}
110+
* cap with an {@link SearchShardStatistics#OVERFLOW_REASON_KEY} overflow bucket.
111+
*/
112+
public void recordShardFailures(final SearchShardStatistics shardStatistics) {
113+
if (shardStatistics == null || !shardStatistics.hasFailures()) {
114+
return;
115+
}
116+
this.hadSearchFailures = true;
117+
if (this.failureReasonCounts == null) {
118+
this.failureReasonCounts = new LinkedHashMap<>();
119+
}
120+
SearchShardStatistics.mergeFailureReasonCounts(this.failureReasonCounts, shardStatistics.getFailureReasonCounts());
121+
}
122+
123+
/**
124+
* Record a per-request failure (e.g. a scroll page that threw). Normalizes
125+
* the exception into {@code type: message} and merges it into the aggregated
126+
* counts map, respecting the cap.
127+
*/
128+
public void recordRequestFailure(final Throwable throwable) {
129+
this.hadSearchFailures = true;
130+
if (this.failureReasonCounts == null) {
131+
this.failureReasonCounts = new LinkedHashMap<>();
132+
}
133+
final String key;
134+
if (throwable == null) {
135+
key = "unknown";
136+
} else {
137+
final String message = throwable.getMessage() == null ? "" : ": " + throwable.getMessage();
138+
key = SearchShardStatistics.normalizeReason(throwable.getClass().getSimpleName() + message);
139+
}
140+
SearchShardStatistics.incrementFailureReasonCount(this.failureReasonCounts, key, 1L);
141+
}
142+
77143
public boolean hasValidPointInTime() {
78144
return Objects.nonNull(pitId) && Objects.nonNull(pitCreationTime) && Objects.nonNull(keepAlive)
79145
&& Instant.ofEpochMilli(pitCreationTime + keepAlive).isAfter(Instant.now());

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetrics.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ public class OpenSearchSourcePluginMetrics {
2020
static final String BYTES_PROCESSED = "bytesProcessed";
2121
static final String CREDENTIALS_CHANGED = "credentialsChanged";
2222
static final String CLIENT_REFRESH_ERRORS = "clientRefreshErrors";
23+
static final String SEARCH_REQUESTS_FAILED = "searchRequestsFailed";
24+
static final String SEARCH_SHARDS_FAILED = "searchShardsFailed";
25+
static final String INDICES_COMPLETED_WITH_FAILURES = "indicesCompletedWithFailures";
2326

2427
private final Counter documentsProcessedCounter;
2528
private final Counter indicesProcessedCounter;
@@ -30,6 +33,9 @@ public class OpenSearchSourcePluginMetrics {
3033
private final DistributionSummary bytesProcessedSummary;
3134
private final Counter credentialsChangeCounter;
3235
private final Counter clientRefreshErrorsCounter;
36+
private final Counter searchRequestsFailedCounter;
37+
private final Counter searchShardsFailedCounter;
38+
private final Counter indicesCompletedWithFailuresCounter;
3339

3440
public static OpenSearchSourcePluginMetrics create(final PluginMetrics pluginMetrics) {
3541
return new OpenSearchSourcePluginMetrics(pluginMetrics);
@@ -44,6 +50,9 @@ private OpenSearchSourcePluginMetrics(final PluginMetrics pluginMetrics) {
4450
bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED);
4551
credentialsChangeCounter = pluginMetrics.counter(CREDENTIALS_CHANGED);
4652
clientRefreshErrorsCounter = pluginMetrics.counter(CLIENT_REFRESH_ERRORS);
53+
searchRequestsFailedCounter = pluginMetrics.counter(SEARCH_REQUESTS_FAILED);
54+
searchShardsFailedCounter = pluginMetrics.counter(SEARCH_SHARDS_FAILED);
55+
indicesCompletedWithFailuresCounter = pluginMetrics.counter(INDICES_COMPLETED_WITH_FAILURES);
4756
}
4857

4958
public Counter getDocumentsProcessedCounter() {
@@ -77,4 +86,16 @@ public Counter getCredentialsChangeCounter() {
7786
public Counter getClientRefreshErrorsCounter() {
7887
return clientRefreshErrorsCounter;
7988
}
89+
90+
public Counter getSearchRequestsFailedCounter() {
91+
return searchRequestsFailedCounter;
92+
}
93+
94+
public Counter getSearchShardsFailedCounter() {
95+
return searchShardsFailedCounter;
96+
}
97+
98+
public Counter getIndicesCompletedWithFailuresCounter() {
99+
return indicesCompletedWithFailuresCounter;
100+
}
80101
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.calculateExponentialBackoffAndJitter;
3838
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.completeIndexPartition;
3939
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.createAcknowledgmentSet;
40+
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.hasMorePages;
41+
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.recordShardFailuresIfAny;
4042
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_ID_METADATA_ATTRIBUTE_NAME;
4143
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.INDEX_METADATA_ATTRIBUTE_NAME;
4244

@@ -103,7 +105,7 @@ public void run() {
103105
openSearchSourcePluginMetrics.getIndexProcessingTimeTimer().record(() -> processIndex(indexPartition.get(), acknowledgementSet));
104106

105107
completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet,
106-
indexPartition.get(), sourceCoordinator);
108+
indexPartition.get(), sourceCoordinator, openSearchSourcePluginMetrics);
107109

108110
openSearchSourcePluginMetrics.getIndicesProcessedCounter().increment();
109111
LOG.info("Completed processing for index: '{}'", indexPartition.get().getPartitionKey());
@@ -178,15 +180,19 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
178180

179181
openSearchIndexProgressState.setSearchAfter(searchWithSearchAfterResults.getNextSearchAfter());
180182

183+
recordShardFailuresIfAny(indexName, searchWithSearchAfterResults.getShardStatistics(), openSearchIndexProgressState, openSearchSourcePluginMetrics);
184+
181185
if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) {
182186
LOG.debug("Renew ownership of index {}", indexName);
183187
sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState);
184188
lastCheckpointTime = System.currentTimeMillis();
185189
}
186-
} while (searchWithSearchAfterResults.getDocuments().size() == searchConfiguration.getBatchSize());
190+
} while (hasMorePages(searchWithSearchAfterResults));
187191

188-
LOG.info("Received {} documents in latest search request, and batch size is {}, exiting pagination",
189-
searchWithSearchAfterResults.getDocuments().size(), searchConfiguration.getBatchSize());
192+
LOG.info("Reached end of index '{}' (last page returned {} documents, nextSearchAfter present: {}).",
193+
indexName,
194+
searchWithSearchAfterResults.getDocuments().size(),
195+
searchWithSearchAfterResults.getNextSearchAfter() != null);
190196

191197
try {
192198
bufferAccumulator.flush();

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.calculateExponentialBackoffAndJitter;
4242
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.completeIndexPartition;
4343
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.createAcknowledgmentSet;
44+
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.hasMorePages;
45+
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.recordShardFailuresIfAny;
4446
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_ID_METADATA_ATTRIBUTE_NAME;
4547
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.INDEX_METADATA_ATTRIBUTE_NAME;
4648

@@ -116,7 +118,7 @@ public void run() {
116118
openSearchSourcePluginMetrics.getIndexProcessingTimeTimer().record(() -> processIndex(indexPartition.get(), acknowledgementSet));
117119

118120
completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet,
119-
indexPartition.get(), sourceCoordinator);
121+
indexPartition.get(), sourceCoordinator, openSearchSourcePluginMetrics);
120122

121123
openSearchSourcePluginMetrics.getIndicesProcessedCounter().increment();
122124
} catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) {
@@ -216,15 +218,19 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
216218
openSearchIndexProgressState.setSearchAfter(searchWithSearchAfterResults.getNextSearchAfter());
217219
openSearchIndexProgressState.setKeepAlive(Duration.ofMillis(openSearchIndexProgressState.getKeepAlive()).plus(EXTEND_KEEP_ALIVE_DURATION).toMillis());
218220

221+
recordShardFailuresIfAny(indexName, searchWithSearchAfterResults.getShardStatistics(), openSearchIndexProgressState, openSearchSourcePluginMetrics);
222+
219223
if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) {
220224
LOG.debug("Renew ownership of index {}", indexName);
221225
sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState);
222226
lastCheckpointTime = System.currentTimeMillis();
223227
}
224-
} while (searchWithSearchAfterResults.getDocuments().size() == searchConfiguration.getBatchSize());
228+
} while (hasMorePages(searchWithSearchAfterResults));
225229

226-
LOG.info("Received {} documents in latest search request, and batch size is {}, exiting pagination",
227-
searchWithSearchAfterResults.getDocuments().size(), searchConfiguration.getBatchSize());
230+
LOG.info("Reached end of index '{}' (last page returned {} documents, nextSearchAfter present: {}).",
231+
indexName,
232+
searchWithSearchAfterResults.getDocuments().size(),
233+
searchWithSearchAfterResults.getNextSearchAfter() != null);
228234

229235
try {
230236
bufferAccumulator.flush();

0 commit comments

Comments
 (0)