diff --git a/data-prepper-plugins/opensearch/build.gradle b/data-prepper-plugins/opensearch/build.gradle index 39c6477a12..ae6c3f32b3 100644 --- a/data-prepper-plugins/opensearch/build.gradle +++ b/data-prepper-plugins/opensearch/build.gradle @@ -46,6 +46,7 @@ dependencies { testImplementation 'net.bytebuddy:byte-buddy:1.15.11' testImplementation 'net.bytebuddy:byte-buddy-agent:1.15.11' testImplementation testLibs.slf4j.simple + testImplementation project(path: ':data-prepper-test:test-common') } sourceSets { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ExistingDocumentQueryManager.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ExistingDocumentQueryManager.java index daab64cfb3..3359e1b493 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ExistingDocumentQueryManager.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ExistingDocumentQueryManager.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.opensearch._types.FieldValue; import org.opensearch.client.opensearch._types.query_dsl.Query; @@ -43,7 +44,11 @@ public class ExistingDocumentQueryManager implements Runnable { static final String DOCUMENTS_CURRENTLY_BEING_QUERIED = "documentsBeingQueried"; + static final String DUPLICATE_EVENTS_IN_QUERY_MANAGER = "duplicateEventsInQueryManager"; + static final String QUERY_TIME = "queryDuplicatesTime"; + + static final String POTENTIAL_DUPLICATES = "potentialDuplicates"; private final Counter eventsDroppedAndReleasedCounter; @@ -51,6 +56,12 @@ public class ExistingDocumentQueryManager implements Runnable { private final Counter eventsReturnedForIndexing; + private final Counter duplicateEventsInQueryManager; + + private final Counter potentialDuplicatesDeleted; + + private final Timer queryTimePerLoop; + private final AtomicInteger documentsCurrentlyBeingQueried = new AtomicInteger(0); private final AtomicInteger documentsCurrentlyBeingQueriedGauge; @@ -88,6 +99,9 @@ public ExistingDocumentQueryManager(final IndexConfiguration indexConfiguration, this.eventsDroppedAndReleasedCounter = pluginMetrics.counter(EVENTS_DROPPED_AND_RELEASED); this.eventsReturnedForIndexing = pluginMetrics.counter(EVENTS_RETURNED_FOR_INDEXING); this.documentsCurrentlyBeingQueriedGauge = pluginMetrics.gauge(DOCUMENTS_CURRENTLY_BEING_QUERIED, documentsCurrentlyBeingQueried, AtomicInteger::get); + this.duplicateEventsInQueryManager = pluginMetrics.counter(DUPLICATE_EVENTS_IN_QUERY_MANAGER); + this.queryTimePerLoop = pluginMetrics.timer(QUERY_TIME); + this.potentialDuplicatesDeleted = pluginMetrics.counter(POTENTIAL_DUPLICATES); this.lockReadyToIngest = new ReentrantLock(); this.lockWaitingForQuery = new ReentrantLock(); } @@ -96,7 +110,7 @@ public ExistingDocumentQueryManager(final IndexConfiguration indexConfiguration, public void run() { while (!Thread.currentThread().isInterrupted() && !shouldStop) { try { - runQueryLoop(); + queryTimePerLoop.record(this::runQueryLoop); } catch (final Exception e) { LOG.error("Exception in primary loop responsible for querying for existing documents, retrying", e); } finally { @@ -116,13 +130,14 @@ void runQueryLoop() { // Query for existing documents final MsearchRequest msearchRequest = buildMultiSearchRequest(); final MsearchResponse msearchResponse = queryForTermValues(msearchRequest); - lastQueryTime = Instant.now(); // Drop and Release Existing Documents dropAndReleaseFoundEvents(msearchResponse); // Move non-existing documents past query_duration to bulkOperationsReadyForIndex moveBulkRequestsThatHaveReachedQueryDuration(); + + lastQueryTime = Instant.now(); } } @@ -134,12 +149,17 @@ public void addBulkOperation(final BulkOperationWrapper bulkOperationWrapper) { lockWaitingForQuery.lock(); final String termValue = bulkOperationWrapper.getTermValue(); try { - bulkOperationsWaitingForQuery.computeIfAbsent(bulkOperationWrapper.getIndex(), + final QueryManagerBulkOperation queryManagerBulkOperation = bulkOperationsWaitingForQuery.computeIfAbsent(bulkOperationWrapper.getIndex(), k -> new ConcurrentHashMap<>()).put(termValue, new QueryManagerBulkOperation(bulkOperationWrapper, Instant.now(), termValue)); + // Only increment if this is a new document + if (queryManagerBulkOperation == null) { + documentsCurrentlyBeingQueriedGauge.incrementAndGet(); + } else { + duplicateEventsInQueryManager.increment(); + } } finally { lockWaitingForQuery.unlock(); } - documentsCurrentlyBeingQueriedGauge.incrementAndGet(); eventsAddedForQuerying.increment(); } @@ -168,19 +188,25 @@ private MsearchRequest buildMultiSearchRequest() { for (final Map.Entry> entry : bulkOperationsWaitingForQuery.entrySet()) { final String index = entry.getKey(); final List values = getTermValues(entry.getValue().values()); - - m.searches(s -> s - .header(h -> h.index(index)) - .body(b -> b - .size(values.size()) - .source(source -> source.filter(f -> f.includes(queryTerm))) - .query(Query.of(q -> q - .terms(TermsQuery.of(t -> t - .field(queryTerm) - .terms(TermsQueryField.of(tf -> tf.value(values))) - )) - )) - )); + final int batchSize = 1000; + + LOG.info("Creating search requests for {} query term values in batches of {}", values.size(), batchSize); + for (int i = 0; i < values.size(); i += batchSize) { + final List chunk = values.subList(i, Math.min(i + batchSize, values.size())); + + m.searches(s -> s + .header(h -> h.index(index)) + .body(b -> b + .size(chunk.size()) + .source(source -> source.filter(f -> f.includes(queryTerm))) + .query(Query.of(q -> q + .terms(TermsQuery.of(t -> t + .field(queryTerm) + .terms(TermsQueryField.of(tf -> tf.value(chunk))) + )) + )) + )); + } } return m; }); @@ -214,7 +240,7 @@ private void moveBulkRequestsThatHaveReachedQueryDuration() { while (bulkOperationIterator.hasNext()) { final Map.Entry entry = bulkOperationIterator.next(); final QueryManagerBulkOperation bulkOperation = entry.getValue(); - if (bulkOperation.getStartTime().plus(indexConfiguration.getQueryDuration()).isBefore(lastQueryTime)) { + if (lastQueryTime != null && bulkOperation.getStartTime().plus(indexConfiguration.getQueryDuration()).isBefore(lastQueryTime)) { lockReadyToIngest.lock(); try { LOG.debug("Moving bulk operation for index {} and term value {} to be ingested after querying and finding no existing document", @@ -234,7 +260,7 @@ private void moveBulkRequestsThatHaveReachedQueryDuration() { private void dropAndReleaseFoundEvents(final MsearchResponse msearchResponse) { msearchResponse.responses().forEach(response -> { if (response.isFailure()) { - LOG.error("Search response failed: {}", response.failure().error().reason()); + LOG.error("Search response failed, potential for duplicate documents: {}", response.failure().error().toString()); } else { response.result().hits().hits().forEach(hit -> { final String indexForHit = hit.index(); @@ -246,11 +272,14 @@ private void dropAndReleaseFoundEvents(final MsearchResponse msearch final Map bulkOperationsForIndex = bulkOperationsWaitingForQuery.get(indexForHit); final QueryManagerBulkOperation bulkOperationToRelease = bulkOperationsForIndex.get(queryTermValue); if (bulkOperationToRelease == null) { - LOG.error("bulk operation for term value {} is null", queryTermValue); + // Means two documents with the same query term value were found + LOG.warn("Bulk operation for term value {} with id {} is null, potentially a duplicate document", queryTermValue, hit.id()); + potentialDuplicatesDeleted.increment(); } else { LOG.debug("Found document with query term {}, dropping and releasing Event handle", queryTermValue); bulkOperationToRelease.getBulkOperationWrapper().releaseEventHandle(true); eventsDroppedAndReleasedCounter.increment(); + documentsCurrentlyBeingQueriedGauge.decrementAndGet(); bulkOperationsForIndex.remove(queryTermValue); } } finally { diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ExistingDocumentQueryManagerTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ExistingDocumentQueryManagerTest.java index db5a434ea7..e076c61176 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ExistingDocumentQueryManagerTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/ExistingDocumentQueryManagerTest.java @@ -18,9 +18,11 @@ import org.opensearch.client.opensearch.core.search.HitsMetadata; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.plugins.sink.opensearch.BulkOperationWrapper; +import org.opensearch.dataprepper.test.helper.ReflectivelySetField; import java.io.IOException; import java.time.Duration; +import java.time.Instant; import java.util.List; import java.util.Set; import java.util.UUID; @@ -36,9 +38,11 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.sink.opensearch.index.ExistingDocumentQueryManager.DOCUMENTS_CURRENTLY_BEING_QUERIED; +import static org.opensearch.dataprepper.plugins.sink.opensearch.index.ExistingDocumentQueryManager.DUPLICATE_EVENTS_IN_QUERY_MANAGER; import static org.opensearch.dataprepper.plugins.sink.opensearch.index.ExistingDocumentQueryManager.EVENTS_ADDED_FOR_QUERYING; import static org.opensearch.dataprepper.plugins.sink.opensearch.index.ExistingDocumentQueryManager.EVENTS_DROPPED_AND_RELEASED; import static org.opensearch.dataprepper.plugins.sink.opensearch.index.ExistingDocumentQueryManager.EVENTS_RETURNED_FOR_INDEXING; +import static org.opensearch.dataprepper.plugins.sink.opensearch.index.ExistingDocumentQueryManager.POTENTIAL_DUPLICATES; @ExtendWith(MockitoExtension.class) public class ExistingDocumentQueryManagerTest { @@ -61,6 +65,12 @@ public class ExistingDocumentQueryManagerTest { @Mock private Counter eventsReturnedForIndexing; + @Mock + private Counter duplicateEventsAddedToQueryManager; + + @Mock + private Counter potentialDuplicates; + @Mock private AtomicInteger documentsCurrentlyQueried; @@ -71,7 +81,9 @@ void setup() { when(pluginMetrics.counter(EVENTS_ADDED_FOR_QUERYING)).thenReturn(eventsAddedForQuerying); when(pluginMetrics.counter(EVENTS_DROPPED_AND_RELEASED)).thenReturn(eventsDroppedAndReleased); when(pluginMetrics.counter(EVENTS_RETURNED_FOR_INDEXING)).thenReturn(eventsReturnedForIndexing); + when(pluginMetrics.counter(DUPLICATE_EVENTS_IN_QUERY_MANAGER)).thenReturn(duplicateEventsAddedToQueryManager); when(pluginMetrics.gauge(eq(DOCUMENTS_CURRENTLY_BEING_QUERIED), any(AtomicInteger.class), any())).thenReturn(documentsCurrentlyQueried); + when(pluginMetrics.counter(POTENTIAL_DUPLICATES)).thenReturn(potentialDuplicates); queryTerm = UUID.randomUUID().toString(); when(indexConfiguration.getQueryTerm()).thenReturn(queryTerm); } @@ -122,13 +134,14 @@ void add_bulk_operation_and_found_in_query_drops_and_releases_event() throws IOE verify(eventsDroppedAndReleased).increment(); verify(eventsAddedForQuerying).increment(); verify(documentsCurrentlyQueried).incrementAndGet(); + verify(documentsCurrentlyQueried).decrementAndGet(); verify(bulkOperationWrapper).releaseEventHandle(true); verifyNoMoreInteractions(indexConfiguration); } @Test - void add_bulk_operation_and_not_found_in_query_returns_as_ready_to_ingest() throws IOException, InterruptedException { + void add_bulk_operation_and_not_found_in_query_returns_as_ready_to_ingest() throws IOException, InterruptedException, NoSuchFieldException, IllegalAccessException { when(indexConfiguration.getQueryDuration()).thenReturn(Duration.ofMillis(1)); final BulkOperationWrapper bulkOperationWrapper = mock(BulkOperationWrapper.class); @@ -164,6 +177,8 @@ void add_bulk_operation_and_not_found_in_query_returns_as_ready_to_ingest() thro final ExistingDocumentQueryManager objectUnderTest = createObjectUnderTest(); + ReflectivelySetField.setField(ExistingDocumentQueryManager.class, objectUnderTest, "lastQueryTime", Instant.now().plusMillis(100)); + objectUnderTest.addBulkOperation(bulkOperationWrapper); Thread.sleep(20); @@ -209,4 +224,75 @@ void add_bulk_operation_and_not_found_in_query_returns_as_ready_to_ingest() thro assertThat(msearchRequest.searches().get(0).body().query().terms().terms().value().get(0), notNullValue()); assertThat(msearchRequest.searches().get(0).body().query().terms().terms().value().get(0).stringValue(), equalTo(termValue)); } + + @Test + void add_bulk_operation_for_same_term_value_increments_duplicate_events_metric() { + final BulkOperationWrapper bulkOperationWrapper = mock(BulkOperationWrapper.class); + final String index = UUID.randomUUID().toString(); + final String termValue = UUID.randomUUID().toString(); + when(bulkOperationWrapper.getTermValue()).thenReturn(termValue); + when(bulkOperationWrapper.getIndex()).thenReturn(index); + + final ExistingDocumentQueryManager objectUnderTest = createObjectUnderTest(); + + objectUnderTest.addBulkOperation(bulkOperationWrapper); + + objectUnderTest.addBulkOperation(bulkOperationWrapper); + + verify(duplicateEventsAddedToQueryManager).increment(); + verify(documentsCurrentlyQueried).incrementAndGet(); + } + + @Test + void query_response_with_two_documents_with_same_term_value_tracks_duplicate_document() throws IOException { + final BulkOperationWrapper bulkOperationWrapper = mock(BulkOperationWrapper.class); + final String index = UUID.randomUUID().toString(); + final String termValue = UUID.randomUUID().toString(); + when(bulkOperationWrapper.getTermValue()).thenReturn(termValue); + when(bulkOperationWrapper.getIndex()).thenReturn(index); + + final MsearchResponse msearchResponse = mock(MsearchResponse.class); + final MultiSearchResponseItem responseItem = mock(MultiSearchResponseItem.class); + when(responseItem.isFailure()).thenReturn(false); + + final MultiSearchItem multiSearchItem = mock(MultiSearchItem.class); + final HitsMetadata hitsMetadata = mock(HitsMetadata.class); + final Hit hit = mock(Hit.class); + when(hit.index()).thenReturn(index); + + final ObjectNode objectNode = mock(ObjectNode.class); + final JsonNode jsonNode = mock(JsonNode.class); + when(jsonNode.textValue()).thenReturn(termValue); + when(objectNode.findValue(queryTerm)).thenReturn(jsonNode); + when(hit.source()).thenReturn(objectNode); + + final Hit duplicateHit = mock(Hit.class); + when(duplicateHit.index()).thenReturn(index); + when(duplicateHit.id()).thenReturn(UUID.randomUUID().toString()); + + when(jsonNode.textValue()).thenReturn(termValue); + when(objectNode.findValue(queryTerm)).thenReturn(jsonNode); + when(duplicateHit.source()).thenReturn(objectNode); + + when(multiSearchItem.hits()).thenReturn(hitsMetadata); + when(hitsMetadata.hits()).thenReturn(List.of(hit, duplicateHit)); + + when(responseItem.result()).thenReturn(multiSearchItem); + + when(msearchResponse.responses()).thenReturn(List.of(responseItem)); + + when(openSearchClient.msearch(any(MsearchRequest.class), eq(ObjectNode.class))) + .thenReturn(msearchResponse); + + final ExistingDocumentQueryManager objectUnderTest = createObjectUnderTest(); + + objectUnderTest.addBulkOperation(bulkOperationWrapper); + + objectUnderTest.runQueryLoop(); + + verify(openSearchClient).msearch(any(MsearchRequest.class), eq(ObjectNode.class)); + verify(potentialDuplicates).increment(); + verifyNoMoreInteractions(openSearchClient); + + } }