33import com .fasterxml .jackson .databind .node .ObjectNode ;
44import com .google .common .annotations .VisibleForTesting ;
55import io .micrometer .core .instrument .Counter ;
6+ import io .micrometer .core .instrument .Timer ;
67import org .opensearch .client .opensearch .OpenSearchClient ;
78import org .opensearch .client .opensearch ._types .FieldValue ;
89import org .opensearch .client .opensearch ._types .query_dsl .Query ;
@@ -43,14 +44,24 @@ public class ExistingDocumentQueryManager implements Runnable {
4344
4445 static final String DOCUMENTS_CURRENTLY_BEING_QUERIED = "documentsBeingQueried" ;
4546
47+ static final String DUPLICATE_EVENTS_IN_QUERY_MANAGER = "duplicateEventsInQueryManager" ;
4648
49+ static final String QUERY_TIME = "queryDuplicatesTime" ;
50+
51+ static final String POTENTIAL_DUPLICATES = "potentialDuplicates" ;
4752
4853 private final Counter eventsDroppedAndReleasedCounter ;
4954
5055 private final Counter eventsAddedForQuerying ;
5156
5257 private final Counter eventsReturnedForIndexing ;
5358
59+ private final Counter duplicateEventsInQueryManager ;
60+
61+ private final Counter potentialDuplicatesDeleted ;
62+
63+ private final Timer queryTimePerLoop ;
64+
5465 private final AtomicInteger documentsCurrentlyBeingQueried = new AtomicInteger (0 );
5566
5667 private final AtomicInteger documentsCurrentlyBeingQueriedGauge ;
@@ -88,6 +99,9 @@ public ExistingDocumentQueryManager(final IndexConfiguration indexConfiguration,
8899 this .eventsDroppedAndReleasedCounter = pluginMetrics .counter (EVENTS_DROPPED_AND_RELEASED );
89100 this .eventsReturnedForIndexing = pluginMetrics .counter (EVENTS_RETURNED_FOR_INDEXING );
90101 this .documentsCurrentlyBeingQueriedGauge = pluginMetrics .gauge (DOCUMENTS_CURRENTLY_BEING_QUERIED , documentsCurrentlyBeingQueried , AtomicInteger ::get );
102+ this .duplicateEventsInQueryManager = pluginMetrics .counter (DUPLICATE_EVENTS_IN_QUERY_MANAGER );
103+ this .queryTimePerLoop = pluginMetrics .timer (QUERY_TIME );
104+ this .potentialDuplicatesDeleted = pluginMetrics .counter (POTENTIAL_DUPLICATES );
91105 this .lockReadyToIngest = new ReentrantLock ();
92106 this .lockWaitingForQuery = new ReentrantLock ();
93107 }
@@ -96,7 +110,7 @@ public ExistingDocumentQueryManager(final IndexConfiguration indexConfiguration,
96110 public void run () {
97111 while (!Thread .currentThread ().isInterrupted () && !shouldStop ) {
98112 try {
99- runQueryLoop ( );
113+ queryTimePerLoop . record ( this :: runQueryLoop );
100114 } catch (final Exception e ) {
101115 LOG .error ("Exception in primary loop responsible for querying for existing documents, retrying" , e );
102116 } finally {
@@ -116,13 +130,14 @@ void runQueryLoop() {
116130 // Query for existing documents
117131 final MsearchRequest msearchRequest = buildMultiSearchRequest ();
118132 final MsearchResponse <ObjectNode > msearchResponse = queryForTermValues (msearchRequest );
119- lastQueryTime = Instant .now ();
120133
121134 // Drop and Release Existing Documents
122135 dropAndReleaseFoundEvents (msearchResponse );
123136
124137 // Move non-existing documents past query_duration to bulkOperationsReadyForIndex
125138 moveBulkRequestsThatHaveReachedQueryDuration ();
139+
140+ lastQueryTime = Instant .now ();
126141 }
127142 }
128143
@@ -134,12 +149,17 @@ public void addBulkOperation(final BulkOperationWrapper bulkOperationWrapper) {
134149 lockWaitingForQuery .lock ();
135150 final String termValue = bulkOperationWrapper .getTermValue ();
136151 try {
137- bulkOperationsWaitingForQuery .computeIfAbsent (bulkOperationWrapper .getIndex (),
152+ final QueryManagerBulkOperation queryManagerBulkOperation = bulkOperationsWaitingForQuery .computeIfAbsent (bulkOperationWrapper .getIndex (),
138153 k -> new ConcurrentHashMap <>()).put (termValue , new QueryManagerBulkOperation (bulkOperationWrapper , Instant .now (), termValue ));
154+ // Only increment if this is a new document
155+ if (queryManagerBulkOperation == null ) {
156+ documentsCurrentlyBeingQueriedGauge .incrementAndGet ();
157+ } else {
158+ duplicateEventsInQueryManager .increment ();
159+ }
139160 } finally {
140161 lockWaitingForQuery .unlock ();
141162 }
142- documentsCurrentlyBeingQueriedGauge .incrementAndGet ();
143163 eventsAddedForQuerying .increment ();
144164 }
145165
@@ -168,19 +188,25 @@ private MsearchRequest buildMultiSearchRequest() {
168188 for (final Map .Entry <String , Map <String , QueryManagerBulkOperation >> entry : bulkOperationsWaitingForQuery .entrySet ()) {
169189 final String index = entry .getKey ();
170190 final List <FieldValue > values = getTermValues (entry .getValue ().values ());
171-
172- m .searches (s -> s
173- .header (h -> h .index (index ))
174- .body (b -> b
175- .size (values .size ())
176- .source (source -> source .filter (f -> f .includes (queryTerm )))
177- .query (Query .of (q -> q
178- .terms (TermsQuery .of (t -> t
179- .field (queryTerm )
180- .terms (TermsQueryField .of (tf -> tf .value (values )))
181- ))
182- ))
183- ));
191+ final int batchSize = 1000 ;
192+
193+ LOG .info ("Creating search requests for {} query term values in batches of {}" , values .size (), batchSize );
194+ for (int i = 0 ; i < values .size (); i += batchSize ) {
195+ final List <FieldValue > chunk = values .subList (i , Math .min (i + batchSize , values .size ()));
196+
197+ m .searches (s -> s
198+ .header (h -> h .index (index ))
199+ .body (b -> b
200+ .size (chunk .size ())
201+ .source (source -> source .filter (f -> f .includes (queryTerm )))
202+ .query (Query .of (q -> q
203+ .terms (TermsQuery .of (t -> t
204+ .field (queryTerm )
205+ .terms (TermsQueryField .of (tf -> tf .value (chunk )))
206+ ))
207+ ))
208+ ));
209+ }
184210 }
185211 return m ;
186212 });
@@ -214,7 +240,7 @@ private void moveBulkRequestsThatHaveReachedQueryDuration() {
214240 while (bulkOperationIterator .hasNext ()) {
215241 final Map .Entry <String , QueryManagerBulkOperation > entry = bulkOperationIterator .next ();
216242 final QueryManagerBulkOperation bulkOperation = entry .getValue ();
217- if (bulkOperation .getStartTime ().plus (indexConfiguration .getQueryDuration ()).isBefore (lastQueryTime )) {
243+ if (lastQueryTime != null && bulkOperation .getStartTime ().plus (indexConfiguration .getQueryDuration ()).isBefore (lastQueryTime )) {
218244 lockReadyToIngest .lock ();
219245 try {
220246 LOG .debug ("Moving bulk operation for index {} and term value {} to be ingested after querying and finding no existing document" ,
@@ -234,7 +260,7 @@ private void moveBulkRequestsThatHaveReachedQueryDuration() {
234260 private void dropAndReleaseFoundEvents (final MsearchResponse <ObjectNode > msearchResponse ) {
235261 msearchResponse .responses ().forEach (response -> {
236262 if (response .isFailure ()) {
237- LOG .error ("Search response failed: {}" , response .failure ().error ().reason ());
263+ LOG .error ("Search response failed, potential for duplicate documents : {}" , response .failure ().error ().toString ());
238264 } else {
239265 response .result ().hits ().hits ().forEach (hit -> {
240266 final String indexForHit = hit .index ();
@@ -246,11 +272,14 @@ private void dropAndReleaseFoundEvents(final MsearchResponse<ObjectNode> msearch
246272 final Map <String , QueryManagerBulkOperation > bulkOperationsForIndex = bulkOperationsWaitingForQuery .get (indexForHit );
247273 final QueryManagerBulkOperation bulkOperationToRelease = bulkOperationsForIndex .get (queryTermValue );
248274 if (bulkOperationToRelease == null ) {
249- LOG .error ("bulk operation for term value {} is null" , queryTermValue );
275+ // Means two documents with the same query term value were found
276+ LOG .warn ("Bulk operation for term value {} with id {} is null, potentially a duplicate document" , queryTermValue , hit .id ());
277+ potentialDuplicatesDeleted .increment ();
250278 } else {
251279 LOG .debug ("Found document with query term {}, dropping and releasing Event handle" , queryTermValue );
252280 bulkOperationToRelease .getBulkOperationWrapper ().releaseEventHandle (true );
253281 eventsDroppedAndReleasedCounter .increment ();
282+ documentsCurrentlyBeingQueriedGauge .decrementAndGet ();
254283 bulkOperationsForIndex .remove (queryTermValue );
255284 }
256285 } finally {
0 commit comments