Skip to content

Commit 6e14d2b

Browse files
committed
remove deletion of potential duplicate document
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent 8d4d13a commit 6e14d2b

2 files changed

Lines changed: 12 additions & 39 deletions

File tree

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ExistingDocumentQueryManager.java

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public class ExistingDocumentQueryManager implements Runnable {
4848

4949
static final String QUERY_TIME = "queryDuplicatesTime";
5050

51-
static final String POTENTIAL_DUPLICATES_DELETED = "potentialDuplicatesDeleted";
51+
static final String POTENTIAL_DUPLICATES = "potentialDuplicates";
5252

5353
private final Counter eventsDroppedAndReleasedCounter;
5454

@@ -101,7 +101,7 @@ public ExistingDocumentQueryManager(final IndexConfiguration indexConfiguration,
101101
this.documentsCurrentlyBeingQueriedGauge = pluginMetrics.gauge(DOCUMENTS_CURRENTLY_BEING_QUERIED, documentsCurrentlyBeingQueried, AtomicInteger::get);
102102
this.duplicateEventsInQueryManager = pluginMetrics.counter(DUPLICATE_EVENTS_IN_QUERY_MANAGER);
103103
this.queryTimePerLoop = pluginMetrics.timer(QUERY_TIME);
104-
this.potentialDuplicatesDeleted = pluginMetrics.counter(POTENTIAL_DUPLICATES_DELETED);
104+
this.potentialDuplicatesDeleted = pluginMetrics.counter(POTENTIAL_DUPLICATES);
105105
this.lockReadyToIngest = new ReentrantLock();
106106
this.lockWaitingForQuery = new ReentrantLock();
107107
}
@@ -272,14 +272,14 @@ private void dropAndReleaseFoundEvents(final MsearchResponse<ObjectNode> msearch
272272
final Map<String, QueryManagerBulkOperation> bulkOperationsForIndex = bulkOperationsWaitingForQuery.get(indexForHit);
273273
final QueryManagerBulkOperation bulkOperationToRelease = bulkOperationsForIndex.get(queryTermValue);
274274
if (bulkOperationToRelease == null) {
275-
// Delete duplicate document
276-
LOG.warn("Bulk operation for term value {} with id {} is null, potentially a duplicate document, deleting", queryTermValue, hit.id());
275+
// Means two documents with the same query term value were found
276+
LOG.warn("Bulk operation for term value {} with id {} is null, potentially a duplicate document", queryTermValue, hit.id());
277277
potentialDuplicatesDeleted.increment();
278-
deleteDuplicateDocument(hit.index(), hit.id());
279278
} else {
280279
LOG.debug("Found document with query term {}, dropping and releasing Event handle", queryTermValue);
281280
bulkOperationToRelease.getBulkOperationWrapper().releaseEventHandle(true);
282281
eventsDroppedAndReleasedCounter.increment();
282+
documentsCurrentlyBeingQueriedGauge.decrementAndGet();
283283
bulkOperationsForIndex.remove(queryTermValue);
284284
}
285285
} finally {
@@ -289,16 +289,4 @@ private void dropAndReleaseFoundEvents(final MsearchResponse<ObjectNode> msearch
289289
}
290290
});
291291
}
292-
293-
private void deleteDuplicateDocument(final String index, final String documentId) {
294-
try {
295-
openSearchClient.delete(d -> d
296-
.index(index)
297-
.id(documentId)
298-
);
299-
LOG.warn("Deleted duplicate document with ID {} from index {}", documentId, index);
300-
} catch (Exception e) {
301-
LOG.error("Failed to delete duplicate document with ID {} from index {}: {}", documentId, index, e.getMessage(), e);
302-
}
303-
}
304292
}

data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ExistingDocumentQueryManagerTest.java

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,12 @@
1010
import org.mockito.Mock;
1111
import org.mockito.junit.jupiter.MockitoExtension;
1212
import org.opensearch.client.opensearch.OpenSearchClient;
13-
import org.opensearch.client.opensearch.core.DeleteRequest;
14-
import org.opensearch.client.opensearch.core.DeleteResponse;
1513
import org.opensearch.client.opensearch.core.MsearchRequest;
1614
import org.opensearch.client.opensearch.core.MsearchResponse;
1715
import org.opensearch.client.opensearch.core.msearch.MultiSearchItem;
1816
import org.opensearch.client.opensearch.core.msearch.MultiSearchResponseItem;
1917
import org.opensearch.client.opensearch.core.search.Hit;
2018
import org.opensearch.client.opensearch.core.search.HitsMetadata;
21-
import org.opensearch.client.util.ObjectBuilder;
2219
import org.opensearch.dataprepper.metrics.PluginMetrics;
2320
import org.opensearch.dataprepper.plugins.sink.opensearch.BulkOperationWrapper;
2421
import org.opensearch.dataprepper.test.helper.ReflectivelySetField;
@@ -30,7 +27,6 @@
3027
import java.util.Set;
3128
import java.util.UUID;
3229
import java.util.concurrent.atomic.AtomicInteger;
33-
import java.util.function.Function;
3430

3531
import static org.hamcrest.CoreMatchers.equalTo;
3632
import static org.hamcrest.CoreMatchers.notNullValue;
@@ -46,7 +42,7 @@
4642
import static org.opensearch.dataprepper.plugins.sink.opensearch.index.ExistingDocumentQueryManager.EVENTS_ADDED_FOR_QUERYING;
4743
import static org.opensearch.dataprepper.plugins.sink.opensearch.index.ExistingDocumentQueryManager.EVENTS_DROPPED_AND_RELEASED;
4844
import static org.opensearch.dataprepper.plugins.sink.opensearch.index.ExistingDocumentQueryManager.EVENTS_RETURNED_FOR_INDEXING;
49-
import static org.opensearch.dataprepper.plugins.sink.opensearch.index.ExistingDocumentQueryManager.POTENTIAL_DUPLICATES_DELETED;
45+
import static org.opensearch.dataprepper.plugins.sink.opensearch.index.ExistingDocumentQueryManager.POTENTIAL_DUPLICATES;
5046

5147
@ExtendWith(MockitoExtension.class)
5248
public class ExistingDocumentQueryManagerTest {
@@ -73,7 +69,7 @@ public class ExistingDocumentQueryManagerTest {
7369
private Counter duplicateEventsAddedToQueryManager;
7470

7571
@Mock
76-
private Counter potentialDuplicatesDeleted;
72+
private Counter potentialDuplicates;
7773

7874
@Mock
7975
private AtomicInteger documentsCurrentlyQueried;
@@ -87,7 +83,7 @@ void setup() {
8783
when(pluginMetrics.counter(EVENTS_RETURNED_FOR_INDEXING)).thenReturn(eventsReturnedForIndexing);
8884
when(pluginMetrics.counter(DUPLICATE_EVENTS_IN_QUERY_MANAGER)).thenReturn(duplicateEventsAddedToQueryManager);
8985
when(pluginMetrics.gauge(eq(DOCUMENTS_CURRENTLY_BEING_QUERIED), any(AtomicInteger.class), any())).thenReturn(documentsCurrentlyQueried);
90-
when(pluginMetrics.counter(POTENTIAL_DUPLICATES_DELETED)).thenReturn(potentialDuplicatesDeleted);
86+
when(pluginMetrics.counter(POTENTIAL_DUPLICATES)).thenReturn(potentialDuplicates);
9187
queryTerm = UUID.randomUUID().toString();
9288
when(indexConfiguration.getQueryTerm()).thenReturn(queryTerm);
9389
}
@@ -138,6 +134,7 @@ void add_bulk_operation_and_found_in_query_drops_and_releases_event() throws IOE
138134
verify(eventsDroppedAndReleased).increment();
139135
verify(eventsAddedForQuerying).increment();
140136
verify(documentsCurrentlyQueried).incrementAndGet();
137+
verify(documentsCurrentlyQueried).decrementAndGet();
141138
verify(bulkOperationWrapper).releaseEventHandle(true);
142139

143140
verifyNoMoreInteractions(indexConfiguration);
@@ -247,7 +244,7 @@ void add_bulk_operation_for_same_term_value_increments_duplicate_events_metric()
247244
}
248245

249246
@Test
250-
void query_response_with_two_documents_with_same_term_value_deletes_duplicate_document() throws IOException {
247+
void query_response_with_two_documents_with_same_term_value_tracks_duplicate_document() throws IOException {
251248
final BulkOperationWrapper bulkOperationWrapper = mock(BulkOperationWrapper.class);
252249
final String index = UUID.randomUUID().toString();
253250
final String termValue = UUID.randomUUID().toString();
@@ -287,27 +284,15 @@ void query_response_with_two_documents_with_same_term_value_deletes_duplicate_do
287284
when(openSearchClient.msearch(any(MsearchRequest.class), eq(ObjectNode.class)))
288285
.thenReturn(msearchResponse);
289286

290-
when(openSearchClient.delete(any(Function.class))).thenReturn(mock(DeleteResponse.class));
291-
292287
final ExistingDocumentQueryManager objectUnderTest = createObjectUnderTest();
293288

294289
objectUnderTest.addBulkOperation(bulkOperationWrapper);
295290

296291
objectUnderTest.runQueryLoop();
297292

298-
final ArgumentCaptor<Function> deleteRequestArgumentCaptor = ArgumentCaptor.forClass(Function.class);
299-
300293
verify(openSearchClient).msearch(any(MsearchRequest.class), eq(ObjectNode.class));
301-
verify(openSearchClient).delete(deleteRequestArgumentCaptor.capture());
302-
verify(potentialDuplicatesDeleted).increment();
303-
304-
assertThat(deleteRequestArgumentCaptor.getAllValues().size(), equalTo(1));
305-
final Function<DeleteRequest.Builder, ObjectBuilder<DeleteRequest>> deleteRequestFunction = deleteRequestArgumentCaptor.getValue();
306-
307-
DeleteRequest actualRequest = deleteRequestFunction.apply(new DeleteRequest.Builder()).build();
308-
assertThat(actualRequest, notNullValue());
309-
assertThat(actualRequest.id(), equalTo(duplicateHit.id()));
310-
assertThat(actualRequest.index(), equalTo(index));
294+
verify(potentialDuplicates).increment();
295+
verifyNoMoreInteractions(openSearchClient);
311296

312297
}
313298
}

0 commit comments

Comments
 (0)