Skip to content

Commit 9d6873f

Browse files
committed
Fix unecessary logging for query manager with 0 terms
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent 5a17afc commit 9d6873f

3 files changed

Lines changed: 7 additions & 2 deletions

File tree

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,7 @@ private Counter getDocumentStatusCounter(final int status) {
452452

453453
private boolean shouldSendAllForQuerying(final Exception exception) {
454454
if (exception != null && existingDocumentQueryManager != null) {
455+
LOG.warn("Received exception that may result in querying for duplicate documents", exception);
455456
if (SOCKET_TIMEOUT_EXCEPTIONS.contains(exception.getClass())) {
456457
return true;
457458
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ExistingDocumentQueryManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void run() {
125125

126126
@VisibleForTesting
127127
void runQueryLoop() {
128-
if (!bulkOperationsWaitingForQuery.isEmpty()) {
128+
if (!bulkOperationsWaitingForQuery.isEmpty() && documentsCurrentlyBeingQueriedGauge.get() > 0) {
129129

130130
// Query for existing documents
131131
final MsearchRequest msearchRequest = buildMultiSearchRequest();
@@ -197,7 +197,7 @@ private MsearchRequest buildMultiSearchRequest() {
197197
m.searches(s -> s
198198
.header(h -> h.index(index))
199199
.body(b -> b
200-
.size(chunk.size())
200+
.size(chunk.size() * 2)
201201
.source(source -> source.filter(f -> f.includes(queryTerm)))
202202
.query(Query.of(q -> q
203203
.terms(TermsQuery.of(t -> t

data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ExistingDocumentQueryManagerTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ void add_bulk_operation_and_found_in_query_drops_and_releases_event() throws IOE
128128
final ExistingDocumentQueryManager objectUnderTest = createObjectUnderTest();
129129

130130
objectUnderTest.addBulkOperation(bulkOperationWrapper);
131+
when(documentsCurrentlyQueried.get()).thenReturn(1);
131132

132133
objectUnderTest.runQueryLoop();
133134

@@ -181,6 +182,8 @@ void add_bulk_operation_and_not_found_in_query_returns_as_ready_to_ingest() thro
181182

182183
objectUnderTest.addBulkOperation(bulkOperationWrapper);
183184

185+
when(documentsCurrentlyQueried.get()).thenReturn(1);
186+
184187
Thread.sleep(20);
185188

186189
objectUnderTest.runQueryLoop();
@@ -287,6 +290,7 @@ void query_response_with_two_documents_with_same_term_value_tracks_duplicate_doc
287290
final ExistingDocumentQueryManager objectUnderTest = createObjectUnderTest();
288291

289292
objectUnderTest.addBulkOperation(bulkOperationWrapper);
293+
when(documentsCurrentlyQueried.get()).thenReturn(1);
290294

291295
objectUnderTest.runQueryLoop();
292296

0 commit comments

Comments
 (0)