Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data-prepper-plugins/opensearch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies {
testImplementation 'net.bytebuddy:byte-buddy:1.15.11'
testImplementation 'net.bytebuddy:byte-buddy-agent:1.15.11'
testImplementation testLibs.slf4j.simple
testImplementation project(path: ':data-prepper-test:test-common')
}

sourceSets {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.FieldValue;
import org.opensearch.client.opensearch._types.query_dsl.Query;
Expand Down Expand Up @@ -43,14 +44,24 @@ public class ExistingDocumentQueryManager implements Runnable {

static final String DOCUMENTS_CURRENTLY_BEING_QUERIED = "documentsBeingQueried";

static final String DUPLICATE_EVENTS_IN_QUERY_MANAGER = "duplicateEventsInQueryManager";

static final String QUERY_TIME = "queryDuplicatesTime";

static final String POTENTIAL_DUPLICATES = "potentialDuplicates";

private final Counter eventsDroppedAndReleasedCounter;

private final Counter eventsAddedForQuerying;

private final Counter eventsReturnedForIndexing;

private final Counter duplicateEventsInQueryManager;

private final Counter potentialDuplicatesDeleted;

private final Timer queryTimePerLoop;

private final AtomicInteger documentsCurrentlyBeingQueried = new AtomicInteger(0);

private final AtomicInteger documentsCurrentlyBeingQueriedGauge;
Expand Down Expand Up @@ -88,6 +99,9 @@ public ExistingDocumentQueryManager(final IndexConfiguration indexConfiguration,
this.eventsDroppedAndReleasedCounter = pluginMetrics.counter(EVENTS_DROPPED_AND_RELEASED);
this.eventsReturnedForIndexing = pluginMetrics.counter(EVENTS_RETURNED_FOR_INDEXING);
this.documentsCurrentlyBeingQueriedGauge = pluginMetrics.gauge(DOCUMENTS_CURRENTLY_BEING_QUERIED, documentsCurrentlyBeingQueried, AtomicInteger::get);
this.duplicateEventsInQueryManager = pluginMetrics.counter(DUPLICATE_EVENTS_IN_QUERY_MANAGER);
this.queryTimePerLoop = pluginMetrics.timer(QUERY_TIME);
this.potentialDuplicatesDeleted = pluginMetrics.counter(POTENTIAL_DUPLICATES);
this.lockReadyToIngest = new ReentrantLock();
this.lockWaitingForQuery = new ReentrantLock();
}
Expand All @@ -96,7 +110,7 @@ public ExistingDocumentQueryManager(final IndexConfiguration indexConfiguration,
public void run() {
while (!Thread.currentThread().isInterrupted() && !shouldStop) {
try {
runQueryLoop();
queryTimePerLoop.record(this::runQueryLoop);
} catch (final Exception e) {
LOG.error("Exception in primary loop responsible for querying for existing documents, retrying", e);
} finally {
Expand All @@ -116,13 +130,14 @@ void runQueryLoop() {
// Query for existing documents
final MsearchRequest msearchRequest = buildMultiSearchRequest();
final MsearchResponse<ObjectNode> msearchResponse = queryForTermValues(msearchRequest);
lastQueryTime = Instant.now();

// Drop and Release Existing Documents
dropAndReleaseFoundEvents(msearchResponse);

// Move non-existing documents past query_duration to bulkOperationsReadyForIndex
moveBulkRequestsThatHaveReachedQueryDuration();

lastQueryTime = Instant.now();
}
}

Expand All @@ -134,12 +149,17 @@ public void addBulkOperation(final BulkOperationWrapper bulkOperationWrapper) {
lockWaitingForQuery.lock();
final String termValue = bulkOperationWrapper.getTermValue();
try {
bulkOperationsWaitingForQuery.computeIfAbsent(bulkOperationWrapper.getIndex(),
final QueryManagerBulkOperation queryManagerBulkOperation = bulkOperationsWaitingForQuery.computeIfAbsent(bulkOperationWrapper.getIndex(),
k -> new ConcurrentHashMap<>()).put(termValue, new QueryManagerBulkOperation(bulkOperationWrapper, Instant.now(), termValue));
// Only increment if this is a new document
if (queryManagerBulkOperation == null) {
documentsCurrentlyBeingQueriedGauge.incrementAndGet();
} else {
duplicateEventsInQueryManager.increment();
}
} finally {
lockWaitingForQuery.unlock();
}
documentsCurrentlyBeingQueriedGauge.incrementAndGet();
eventsAddedForQuerying.increment();
}

Expand Down Expand Up @@ -168,19 +188,25 @@ private MsearchRequest buildMultiSearchRequest() {
for (final Map.Entry<String, Map<String, QueryManagerBulkOperation>> entry : bulkOperationsWaitingForQuery.entrySet()) {
final String index = entry.getKey();
final List<FieldValue> values = getTermValues(entry.getValue().values());

m.searches(s -> s
.header(h -> h.index(index))
.body(b -> b
.size(values.size())
.source(source -> source.filter(f -> f.includes(queryTerm)))
.query(Query.of(q -> q
.terms(TermsQuery.of(t -> t
.field(queryTerm)
.terms(TermsQueryField.of(tf -> tf.value(values)))
))
))
));
final int batchSize = 1000;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to make this configurable?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make it configurable I suppose.


LOG.info("Creating search requests for {} query term values in batches of {}", values.size(), batchSize);
for (int i = 0; i < values.size(); i += batchSize) {
final List<FieldValue> chunk = values.subList(i, Math.min(i + batchSize, values.size()));

m.searches(s -> s
.header(h -> h.index(index))
.body(b -> b
.size(chunk.size())
.source(source -> source.filter(f -> f.includes(queryTerm)))
.query(Query.of(q -> q
.terms(TermsQuery.of(t -> t
.field(queryTerm)
.terms(TermsQueryField.of(tf -> tf.value(chunk)))
))
))
));
}
}
return m;
});
Expand Down Expand Up @@ -214,7 +240,7 @@ private void moveBulkRequestsThatHaveReachedQueryDuration() {
while (bulkOperationIterator.hasNext()) {
final Map.Entry<String, QueryManagerBulkOperation> entry = bulkOperationIterator.next();
final QueryManagerBulkOperation bulkOperation = entry.getValue();
if (bulkOperation.getStartTime().plus(indexConfiguration.getQueryDuration()).isBefore(lastQueryTime)) {
if (lastQueryTime != null && bulkOperation.getStartTime().plus(indexConfiguration.getQueryDuration()).isBefore(lastQueryTime)) {
lockReadyToIngest.lock();
try {
LOG.debug("Moving bulk operation for index {} and term value {} to be ingested after querying and finding no existing document",
Expand All @@ -234,7 +260,7 @@ private void moveBulkRequestsThatHaveReachedQueryDuration() {
private void dropAndReleaseFoundEvents(final MsearchResponse<ObjectNode> msearchResponse) {
msearchResponse.responses().forEach(response -> {
if (response.isFailure()) {
LOG.error("Search response failed: {}", response.failure().error().reason());
LOG.error("Search response failed, potential for duplicate documents: {}", response.failure().error().toString());
} else {
response.result().hits().hits().forEach(hit -> {
final String indexForHit = hit.index();
Expand All @@ -246,11 +272,14 @@ private void dropAndReleaseFoundEvents(final MsearchResponse<ObjectNode> msearch
final Map<String, QueryManagerBulkOperation> bulkOperationsForIndex = bulkOperationsWaitingForQuery.get(indexForHit);
final QueryManagerBulkOperation bulkOperationToRelease = bulkOperationsForIndex.get(queryTermValue);
if (bulkOperationToRelease == null) {
LOG.error("bulk operation for term value {} is null", queryTermValue);
// Means two documents with the same query term value were found
LOG.warn("Bulk operation for term value {} with id {} is null, potentially a duplicate document", queryTermValue, hit.id());
potentialDuplicatesDeleted.increment();
} else {
LOG.debug("Found document with query term {}, dropping and releasing Event handle", queryTermValue);
bulkOperationToRelease.getBulkOperationWrapper().releaseEventHandle(true);
eventsDroppedAndReleasedCounter.increment();
documentsCurrentlyBeingQueriedGauge.decrementAndGet();
bulkOperationsForIndex.remove(queryTermValue);
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import org.opensearch.client.opensearch.core.search.HitsMetadata;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.plugins.sink.opensearch.BulkOperationWrapper;
import org.opensearch.dataprepper.test.helper.ReflectivelySetField;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Set;
import java.util.UUID;
Expand All @@ -36,9 +38,11 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.sink.opensearch.index.ExistingDocumentQueryManager.DOCUMENTS_CURRENTLY_BEING_QUERIED;
import static org.opensearch.dataprepper.plugins.sink.opensearch.index.ExistingDocumentQueryManager.DUPLICATE_EVENTS_IN_QUERY_MANAGER;
import static org.opensearch.dataprepper.plugins.sink.opensearch.index.ExistingDocumentQueryManager.EVENTS_ADDED_FOR_QUERYING;
import static org.opensearch.dataprepper.plugins.sink.opensearch.index.ExistingDocumentQueryManager.EVENTS_DROPPED_AND_RELEASED;
import static org.opensearch.dataprepper.plugins.sink.opensearch.index.ExistingDocumentQueryManager.EVENTS_RETURNED_FOR_INDEXING;
import static org.opensearch.dataprepper.plugins.sink.opensearch.index.ExistingDocumentQueryManager.POTENTIAL_DUPLICATES;

@ExtendWith(MockitoExtension.class)
public class ExistingDocumentQueryManagerTest {
Expand All @@ -61,6 +65,12 @@ public class ExistingDocumentQueryManagerTest {
@Mock
private Counter eventsReturnedForIndexing;

@Mock
private Counter duplicateEventsAddedToQueryManager;

@Mock
private Counter potentialDuplicates;

@Mock
private AtomicInteger documentsCurrentlyQueried;

Expand All @@ -71,7 +81,9 @@ void setup() {
when(pluginMetrics.counter(EVENTS_ADDED_FOR_QUERYING)).thenReturn(eventsAddedForQuerying);
when(pluginMetrics.counter(EVENTS_DROPPED_AND_RELEASED)).thenReturn(eventsDroppedAndReleased);
when(pluginMetrics.counter(EVENTS_RETURNED_FOR_INDEXING)).thenReturn(eventsReturnedForIndexing);
when(pluginMetrics.counter(DUPLICATE_EVENTS_IN_QUERY_MANAGER)).thenReturn(duplicateEventsAddedToQueryManager);
when(pluginMetrics.gauge(eq(DOCUMENTS_CURRENTLY_BEING_QUERIED), any(AtomicInteger.class), any())).thenReturn(documentsCurrentlyQueried);
when(pluginMetrics.counter(POTENTIAL_DUPLICATES)).thenReturn(potentialDuplicates);
queryTerm = UUID.randomUUID().toString();
when(indexConfiguration.getQueryTerm()).thenReturn(queryTerm);
}
Expand Down Expand Up @@ -122,13 +134,14 @@ void add_bulk_operation_and_found_in_query_drops_and_releases_event() throws IOE
verify(eventsDroppedAndReleased).increment();
verify(eventsAddedForQuerying).increment();
verify(documentsCurrentlyQueried).incrementAndGet();
verify(documentsCurrentlyQueried).decrementAndGet();
verify(bulkOperationWrapper).releaseEventHandle(true);

verifyNoMoreInteractions(indexConfiguration);
}

@Test
void add_bulk_operation_and_not_found_in_query_returns_as_ready_to_ingest() throws IOException, InterruptedException {
void add_bulk_operation_and_not_found_in_query_returns_as_ready_to_ingest() throws IOException, InterruptedException, NoSuchFieldException, IllegalAccessException {
when(indexConfiguration.getQueryDuration()).thenReturn(Duration.ofMillis(1));

final BulkOperationWrapper bulkOperationWrapper = mock(BulkOperationWrapper.class);
Expand Down Expand Up @@ -164,6 +177,8 @@ void add_bulk_operation_and_not_found_in_query_returns_as_ready_to_ingest() thro

final ExistingDocumentQueryManager objectUnderTest = createObjectUnderTest();

ReflectivelySetField.setField(ExistingDocumentQueryManager.class, objectUnderTest, "lastQueryTime", Instant.now().plusMillis(100));

objectUnderTest.addBulkOperation(bulkOperationWrapper);

Thread.sleep(20);
Expand Down Expand Up @@ -209,4 +224,75 @@ void add_bulk_operation_and_not_found_in_query_returns_as_ready_to_ingest() thro
assertThat(msearchRequest.searches().get(0).body().query().terms().terms().value().get(0), notNullValue());
assertThat(msearchRequest.searches().get(0).body().query().terms().terms().value().get(0).stringValue(), equalTo(termValue));
}

@Test
void add_bulk_operation_for_same_term_value_increments_duplicate_events_metric() {
final BulkOperationWrapper bulkOperationWrapper = mock(BulkOperationWrapper.class);
final String index = UUID.randomUUID().toString();
final String termValue = UUID.randomUUID().toString();
when(bulkOperationWrapper.getTermValue()).thenReturn(termValue);
when(bulkOperationWrapper.getIndex()).thenReturn(index);

final ExistingDocumentQueryManager objectUnderTest = createObjectUnderTest();

objectUnderTest.addBulkOperation(bulkOperationWrapper);

objectUnderTest.addBulkOperation(bulkOperationWrapper);

verify(duplicateEventsAddedToQueryManager).increment();
verify(documentsCurrentlyQueried).incrementAndGet();
}

@Test
void query_response_with_two_documents_with_same_term_value_tracks_duplicate_document() throws IOException {
final BulkOperationWrapper bulkOperationWrapper = mock(BulkOperationWrapper.class);
final String index = UUID.randomUUID().toString();
final String termValue = UUID.randomUUID().toString();
when(bulkOperationWrapper.getTermValue()).thenReturn(termValue);
when(bulkOperationWrapper.getIndex()).thenReturn(index);

final MsearchResponse<ObjectNode> msearchResponse = mock(MsearchResponse.class);
final MultiSearchResponseItem<ObjectNode> responseItem = mock(MultiSearchResponseItem.class);
when(responseItem.isFailure()).thenReturn(false);

final MultiSearchItem<ObjectNode> multiSearchItem = mock(MultiSearchItem.class);
final HitsMetadata<ObjectNode> hitsMetadata = mock(HitsMetadata.class);
final Hit<ObjectNode> hit = mock(Hit.class);
when(hit.index()).thenReturn(index);

final ObjectNode objectNode = mock(ObjectNode.class);
final JsonNode jsonNode = mock(JsonNode.class);
when(jsonNode.textValue()).thenReturn(termValue);
when(objectNode.findValue(queryTerm)).thenReturn(jsonNode);
when(hit.source()).thenReturn(objectNode);

final Hit<ObjectNode> duplicateHit = mock(Hit.class);
when(duplicateHit.index()).thenReturn(index);
when(duplicateHit.id()).thenReturn(UUID.randomUUID().toString());

when(jsonNode.textValue()).thenReturn(termValue);
when(objectNode.findValue(queryTerm)).thenReturn(jsonNode);
when(duplicateHit.source()).thenReturn(objectNode);

when(multiSearchItem.hits()).thenReturn(hitsMetadata);
when(hitsMetadata.hits()).thenReturn(List.of(hit, duplicateHit));

when(responseItem.result()).thenReturn(multiSearchItem);

when(msearchResponse.responses()).thenReturn(List.of(responseItem));

when(openSearchClient.msearch(any(MsearchRequest.class), eq(ObjectNode.class)))
.thenReturn(msearchResponse);

final ExistingDocumentQueryManager objectUnderTest = createObjectUnderTest();

objectUnderTest.addBulkOperation(bulkOperationWrapper);

objectUnderTest.runQueryLoop();

verify(openSearchClient).msearch(any(MsearchRequest.class), eq(ObjectNode.class));
verify(potentialDuplicates).increment();
verifyNoMoreInteractions(openSearchClient);

}
}
Loading