Skip to content

Commit 5004107

Browse files
committed
Refactor failure tracking and improve test coverage for OpenSearch source
- Add @JsonProperty on fields for explicit bidirectional JSON mapping - Extract ShardFailureAggregator for cleaner separation of concerns - Remove unused totalHits from SearchScrollResponse - Add IP:port normalization to SearchShardStatistics - Improve scroll failure log message to note possible skipped documents - Add unit tests for WorkerCommonUtils.hasMorePages - Add ScrollWorker test for short-page continuation (key fix behavior) - Replace brittle call-count mocking with per-page result objects in PitWorkerTest - Replace Thread.sleep with awaitility in NoSearchContextWorkerTest Signed-off-by: Keyur-S-Patel <keyurpatel.opensource@gmail.com>
1 parent cdbc51e commit 5004107

16 files changed

Lines changed: 993 additions & 77 deletions

File tree

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ private void doInitializeInternal() throws IOException {
160160

161161
// Create index with semantic enrichment via AWS control plane (AOSS or managed domain) if configured.
162162
new SemanticEnrichmentIndexManager(awsCredentialsSupplier).maybeCreateIndex(
163-
openSearchSinkConfig.getConnectionConfiguration(),
163+
connectionConfiguration,
164164
openSearchSinkConfig.getIndexConfiguration().getSemanticEnrichmentConfig(),
165165
openSearchSinkConfig.getIndexConfiguration().getSemanticEnrichmentResourceName(),
166166
configuredIndexAlias);

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchIndexProgressState.java

Lines changed: 26 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.dataprepper.plugins.source.opensearch;
77

88
import com.fasterxml.jackson.annotation.JsonCreator;
9+
import com.fasterxml.jackson.annotation.JsonIgnore;
910
import com.fasterxml.jackson.annotation.JsonInclude;
1011
import com.fasterxml.jackson.annotation.JsonProperty;
1112
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchShardStatistics;
@@ -30,9 +31,11 @@ public class OpenSearchIndexProgressState {
3031
@JsonInclude(JsonInclude.Include.NON_NULL)
3132
private List<String> searchAfter;
3233

34+
@JsonProperty("had_search_failures")
3335
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
3436
private boolean hadSearchFailures;
3537

38+
@JsonProperty("failure_reason_counts")
3639
@JsonInclude(JsonInclude.Include.NON_EMPTY)
3740
private Map<String, Long> failureReasonCounts = new LinkedHashMap<>();
3841

@@ -87,57 +90,47 @@ public void setKeepAlive(final Long keepAlive) {
8790
this.keepAlive = keepAlive;
8891
}
8992

93+
@JsonIgnore
94+
private transient ShardFailureAggregator failureAggregator;
95+
96+
private ShardFailureAggregator getOrCreateAggregator() {
97+
if (failureAggregator == null) {
98+
failureAggregator = new ShardFailureAggregator(hadSearchFailures, failureReasonCounts);
99+
}
100+
return failureAggregator;
101+
}
102+
103+
private void syncFromAggregator() {
104+
hadSearchFailures = failureAggregator.hadFailures();
105+
failureReasonCounts = new LinkedHashMap<>(failureAggregator.getFailureReasonCounts());
106+
}
107+
90108
public boolean isHadSearchFailures() {
91-
return hadSearchFailures;
109+
return getOrCreateAggregator().hadFailures();
92110
}
93111

94112
public void setHadSearchFailures(final boolean hadSearchFailures) {
95113
this.hadSearchFailures = hadSearchFailures;
114+
this.failureAggregator = null;
96115
}
97116

98117
public Map<String, Long> getFailureReasonCounts() {
99-
return failureReasonCounts;
118+
return getOrCreateAggregator().getFailureReasonCounts();
100119
}
101120

102121
public void setFailureReasonCounts(final Map<String, Long> failureReasonCounts) {
103122
this.failureReasonCounts = failureReasonCounts == null ? new LinkedHashMap<>() : new LinkedHashMap<>(failureReasonCounts);
123+
this.failureAggregator = null;
104124
}
105125

106-
/**
107-
* Record shard-level failures observed on a page. Sets the had-failures flag
108-
* and merges the per-response aggregated reason counts into the persisted
109-
* progress state, respecting the {@link SearchShardStatistics#MAX_DISTINCT_REASONS}
110-
* cap with an {@link SearchShardStatistics#OVERFLOW_REASON_KEY} overflow bucket.
111-
*/
112126
public void recordShardFailures(final SearchShardStatistics shardStatistics) {
113-
if (shardStatistics == null || !shardStatistics.hasFailures()) {
114-
return;
115-
}
116-
this.hadSearchFailures = true;
117-
if (this.failureReasonCounts == null) {
118-
this.failureReasonCounts = new LinkedHashMap<>();
119-
}
120-
SearchShardStatistics.mergeFailureReasonCounts(this.failureReasonCounts, shardStatistics.getFailureReasonCounts());
127+
getOrCreateAggregator().recordShardFailures(shardStatistics);
128+
syncFromAggregator();
121129
}
122130

123-
/**
124-
* Record a per-request failure (e.g. a scroll page that threw). Normalizes
125-
* the exception into {@code type: message} and merges it into the aggregated
126-
* counts map, respecting the cap.
127-
*/
128131
public void recordRequestFailure(final Throwable throwable) {
129-
this.hadSearchFailures = true;
130-
if (this.failureReasonCounts == null) {
131-
this.failureReasonCounts = new LinkedHashMap<>();
132-
}
133-
final String key;
134-
if (throwable == null) {
135-
key = "unknown";
136-
} else {
137-
final String message = throwable.getMessage() == null ? "" : ": " + throwable.getMessage();
138-
key = SearchShardStatistics.normalizeReason(throwable.getClass().getSimpleName() + message);
139-
}
140-
SearchShardStatistics.incrementFailureReasonCount(this.failureReasonCounts, key, 1L);
132+
getOrCreateAggregator().recordRequestFailure(throwable);
133+
syncFromAggregator();
141134
}
142135

143136
public boolean hasValidPointInTime() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins.source.opensearch;
11+
12+
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchShardStatistics;
13+
14+
import java.util.Collections;
15+
import java.util.LinkedHashMap;
16+
import java.util.Map;
17+
18+
class ShardFailureAggregator {
19+
20+
static final int MAX_DISTINCT_REASONS = 20;
21+
static final String OVERFLOW_REASON_KEY = "__other__";
22+
23+
private boolean hadFailures;
24+
private final Map<String, Long> failureReasonCounts;
25+
26+
ShardFailureAggregator() {
27+
this.hadFailures = false;
28+
this.failureReasonCounts = new LinkedHashMap<>();
29+
}
30+
31+
ShardFailureAggregator(final boolean hadFailures, final Map<String, Long> counts) {
32+
this.hadFailures = hadFailures;
33+
this.failureReasonCounts = counts == null ? new LinkedHashMap<>() : new LinkedHashMap<>(counts);
34+
}
35+
36+
void recordShardFailures(final SearchShardStatistics pageStats) {
37+
if (pageStats == null || !pageStats.hasFailures()) {
38+
return;
39+
}
40+
this.hadFailures = true;
41+
mergeFailureReasonCounts(pageStats.getFailureReasonCounts());
42+
}
43+
44+
void recordRequestFailure(final Throwable throwable) {
45+
this.hadFailures = true;
46+
final String key;
47+
if (throwable == null) {
48+
key = "unknown";
49+
} else {
50+
final String message = throwable.getMessage() == null ? "" : ": " + throwable.getMessage();
51+
key = SearchShardStatistics.normalizeReason(throwable.getClass().getSimpleName() + message);
52+
}
53+
increment(key, 1L);
54+
}
55+
56+
boolean hadFailures() {
57+
return hadFailures;
58+
}
59+
60+
Map<String, Long> getFailureReasonCounts() {
61+
return Collections.unmodifiableMap(failureReasonCounts);
62+
}
63+
64+
private void mergeFailureReasonCounts(final Map<String, Long> toMerge) {
65+
if (toMerge == null || toMerge.isEmpty()) {
66+
return;
67+
}
68+
for (final Map.Entry<String, Long> entry : toMerge.entrySet()) {
69+
increment(entry.getKey(), entry.getValue() == null ? 0L : entry.getValue());
70+
}
71+
}
72+
73+
private void increment(final String key, final long delta) {
74+
if (key == null || delta <= 0) {
75+
return;
76+
}
77+
if (failureReasonCounts.containsKey(key)) {
78+
failureReasonCounts.merge(key, delta, Long::sum);
79+
return;
80+
}
81+
if (failureReasonCounts.size() < MAX_DISTINCT_REASONS) {
82+
failureReasonCounts.put(key, delta);
83+
return;
84+
}
85+
failureReasonCounts.merge(OVERFLOW_REASON_KEY, delta, Long::sum);
86+
}
87+
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,8 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
200200
consecutiveFailures++;
201201
openSearchSourcePluginMetrics.getSearchRequestsFailedCounter().increment();
202202
openSearchIndexProgressState.recordRequestFailure(e);
203-
LOG.warn("Scroll page failed for index '{}' ({}/{}). Continuing pagination.",
203+
LOG.warn("Scroll page failed for index '{}' ({}/{}). Some documents may have been skipped. " +
204+
"Continuing pagination with the next scroll page.",
204205
indexName, consecutiveFailures, MAX_CONSECUTIVE_SCROLL_FAILURES, e);
205206
if (consecutiveFailures >= MAX_CONSECUTIVE_SCROLL_FAILURES) {
206207
deleteScroll(createScrollResponse.getScrollId());

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,6 @@ public SearchScrollResponse searchWithScroll(final SearchScrollRequest searchScr
219219
.withScrollId(searchResponse.scrollId())
220220
.withDocuments(getDocumentsFromResponse(searchResponse))
221221
.withShardStatistics(toShardStatistics(searchResponse.shards()))
222-
.withTotalHits(extractTotalHits(searchResponse))
223222
.build();
224223
}
225224

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,6 @@ public SearchScrollResponse searchWithScroll(final SearchScrollRequest searchScr
213213
.withScrollId(searchResponse.scrollId())
214214
.withDocuments(getDocumentsFromResponse(searchResponse))
215215
.withShardStatistics(toShardStatistics(searchResponse.shards()))
216-
.withTotalHits(extractTotalHits(searchResponse))
217216
.build();
218217
}
219218

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchScrollResponse.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ public class SearchScrollResponse {
1313
private final String scrollId;
1414
private final List<Event> documents;
1515
private final SearchShardStatistics shardStatistics;
16-
private final Long totalHits;
1716

1817
public String getScrollId() {
1918
return scrollId;
@@ -27,15 +26,10 @@ public SearchShardStatistics getShardStatistics() {
2726
return shardStatistics == null ? SearchShardStatistics.empty() : shardStatistics;
2827
}
2928

30-
public Long getTotalHits() {
31-
return totalHits;
32-
}
33-
3429
private SearchScrollResponse(final SearchScrollResponse.Builder builder) {
3530
this.scrollId = builder.scrollId;
3631
this.documents = builder.documents;
3732
this.shardStatistics = builder.shardStatistics;
38-
this.totalHits = builder.totalHits;
3933
}
4034

4135
public static SearchScrollResponse.Builder builder() {
@@ -47,7 +41,6 @@ public static class Builder {
4741
private String scrollId;
4842
private List<Event> documents;
4943
private SearchShardStatistics shardStatistics;
50-
private Long totalHits;
5144

5245
public Builder() {
5346

@@ -68,11 +61,6 @@ public SearchScrollResponse.Builder withShardStatistics(final SearchShardStatist
6861
return this;
6962
}
7063

71-
public SearchScrollResponse.Builder withTotalHits(final Long totalHits) {
72-
this.totalHits = totalHits;
73-
return this;
74-
}
75-
7664
public SearchScrollResponse build() {
7765
return new SearchScrollResponse(this);
7866
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SearchShardStatistics.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ public class SearchShardStatistics {
3434
"\\b[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}\\b");
3535
private static final Pattern SHARD_ID_PATTERN = Pattern.compile("\\[[^\\]]*\\]\\[\\d+\\]");
3636
private static final Pattern NODE_ID_PATTERN = Pattern.compile("node\\[[^\\]]+\\]");
37+
private static final Pattern IP_PORT_PATTERN = Pattern.compile(
38+
"\\b\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}(:\\d{1,5})?\\b");
3739
private static final int MAX_REASON_KEY_LENGTH = 512;
3840
private static final String UNKNOWN_REASON = "unknown";
3941

@@ -100,6 +102,7 @@ public static String normalizeReason(final String rawReason) {
100102
normalized = SHARD_ID_PATTERN.matcher(normalized).replaceAll("[shard]");
101103
normalized = NODE_ID_PATTERN.matcher(normalized).replaceAll("node[?]");
102104
normalized = UUID_PATTERN.matcher(normalized).replaceAll("<uuid>");
105+
normalized = IP_PORT_PATTERN.matcher(normalized).replaceAll("<ip>");
103106
normalized = normalized.trim();
104107
if (normalized.isEmpty()) {
105108
return UNKNOWN_REASON;

data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchIndexProgressStateTest.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,4 +149,57 @@ void recordRequestFailure_with_null_message_uses_class_name_only() {
149149
assertThat(state.isHadSearchFailures(), is(true));
150150
assertThat(state.getFailureReasonCounts().get("RuntimeException"), equalTo(1L));
151151
}
152+
153+
@Test
154+
void setHadSearchFailures_invalidates_aggregator_cache() {
155+
final OpenSearchIndexProgressState state = new OpenSearchIndexProgressState();
156+
state.recordRequestFailure(new RuntimeException("boom"));
157+
assertThat(state.isHadSearchFailures(), is(true));
158+
159+
state.setHadSearchFailures(false);
160+
161+
assertThat(state.isHadSearchFailures(), is(false));
162+
}
163+
164+
@Test
165+
void setFailureReasonCounts_invalidates_aggregator_and_uses_new_counts() {
166+
final OpenSearchIndexProgressState state = new OpenSearchIndexProgressState();
167+
state.recordRequestFailure(new RuntimeException("initial"));
168+
169+
final Map<String, Long> replacement = new LinkedHashMap<>();
170+
replacement.put("replaced", 99L);
171+
state.setFailureReasonCounts(replacement);
172+
173+
assertThat(state.getFailureReasonCounts().get("replaced"), equalTo(99L));
174+
assertThat(state.getFailureReasonCounts().get("RuntimeException: initial"), nullValue());
175+
}
176+
177+
@Test
178+
void recording_after_setter_accumulates_on_top_of_new_state() {
179+
final OpenSearchIndexProgressState state = new OpenSearchIndexProgressState();
180+
181+
final Map<String, Long> seed = new LinkedHashMap<>();
182+
seed.put("existing", 5L);
183+
state.setFailureReasonCounts(seed);
184+
state.setHadSearchFailures(true);
185+
186+
state.recordRequestFailure(new RuntimeException("new error"));
187+
188+
assertThat(state.getFailureReasonCounts().get("existing"), equalTo(5L));
189+
assertThat(state.getFailureReasonCounts().get("RuntimeException: new error"), equalTo(1L));
190+
}
191+
192+
@Test
193+
void jackson_constructor_with_pre_populated_data_allows_further_recording() {
194+
final Map<String, Long> existing = new LinkedHashMap<>();
195+
existing.put("prior_error", 10L);
196+
final OpenSearchIndexProgressState state = new OpenSearchIndexProgressState(
197+
null, null, null, null, true, existing);
198+
199+
state.recordRequestFailure(new RuntimeException("new"));
200+
201+
assertThat(state.isHadSearchFailures(), is(true));
202+
assertThat(state.getFailureReasonCounts().get("prior_error"), equalTo(10L));
203+
assertThat(state.getFailureReasonCounts().get("RuntimeException: new"), equalTo(1L));
204+
}
152205
}

0 commit comments

Comments
 (0)