diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java index 627ef8adea..651fd4edac 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java @@ -183,6 +183,9 @@ private void processIndex(final SourcePartition op } } while (searchWithSearchAfterResults.getDocuments().size() == searchConfiguration.getBatchSize()); + LOG.info("Received {} documents in latest search request, and batch size is {}, exiting pagination", + searchWithSearchAfterResults.getDocuments().size(), searchConfiguration.getBatchSize()); + try { bufferAccumulator.flush(); } catch (final Exception e) { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java index 8f381dce99..9e54b4cbcc 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java @@ -221,6 +221,9 @@ private void processIndex(final SourcePartition op } } while (searchWithSearchAfterResults.getDocuments().size() == searchConfiguration.getBatchSize()); + LOG.info("Received {} documents in latest search request, and batch size is {}, exiting pagination", + searchWithSearchAfterResults.getDocuments().size(), searchConfiguration.getBatchSize()); + try { bufferAccumulator.flush(); } catch (final Exception e) { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java index 7c43b48c46..e116e41e46 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java @@ -188,6 +188,9 @@ private void processIndex(final SourcePartition op } while (searchScrollResponse.getDocuments().size() == batchSize); } + LOG.info("Received {} documents in latest search request, and batch size is {}, exiting pagination", + searchScrollResponse.getDocuments().size(), batchSize); + deleteScroll(createScrollResponse.getScrollId()); try { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java index a0f5a6672f..5a6fc23db7 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java @@ -55,6 +55,7 @@ import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchAccessor.SCROLL_RESOURCE_LIMIT_EXCEPTION_MESSAGE; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_ID_METADATA_ATTRIBUTE_NAME; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.INDEX_METADATA_ATTRIBUTE_NAME; public class ElasticsearchAccessor implements SearchAccessor, ClusterClientFactory { @@ -122,6 +123,7 @@ public SearchWithSearchAfterResults searchWithPit(final SearchPointInTimeRequest SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))), SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSortBuilder -> fieldSortBuilder.field("_id").order(SortOrder.Asc))))) ) + .version(true) .query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery)))); if (Objects.nonNull(searchPointInTimeRequest.getSearchAfter())) { @@ -161,6 +163,7 @@ public CreateScrollResponse createScroll(final CreateScrollRequest createScrollR .scroll(Time.of(time -> time.time(createScrollRequest.getScrollTime()))) .sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc))))) .size(createScrollRequest.getSize()) + .version(true) .index(createScrollRequest.getIndex())), ObjectNode.class); } catch (final ElasticsearchException e) { if (isDueToNoIndexFound(e)) { @@ -233,6 +236,7 @@ public SearchWithSearchAfterResults searchWithoutSearchContext(final NoSearchCon SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))), SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSortBuilder -> fieldSortBuilder.field("_id").order(SortOrder.Asc))))) ) + .version(true) .query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery)))); if (Objects.nonNull(noSearchContextSearchRequest.getSearchAfter())) { @@ -295,7 +299,10 @@ private List getDocumentsFromResponse(final SearchResponse se return searchResponse.hits().hits().stream() .map(hit -> JacksonEvent.builder() .withData(hit.source()) - .withEventMetadataAttributes(Map.of(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME, hit.id(), INDEX_METADATA_ATTRIBUTE_NAME, hit.index())) + .withEventMetadataAttributes( + Map.of(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME, hit.id(), + INDEX_METADATA_ATTRIBUTE_NAME, hit.index(), + DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME, hit.version())) .withEventType(EventType.DOCUMENT.toString()).build()) .collect(Collectors.toList()); } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java index 6dcb21e445..bc76b4de12 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java @@ -55,6 +55,7 @@ import java.util.stream.Collectors; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_ID_METADATA_ATTRIBUTE_NAME; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.INDEX_METADATA_ATTRIBUTE_NAME; public class OpenSearchAccessor implements SearchAccessor, ClusterClientFactory { @@ -120,6 +121,7 @@ public SearchWithSearchAfterResults searchWithPit(final SearchPointInTimeRequest SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))), SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSort -> fieldSort.field("_id").order(SortOrder.Asc))))) ) + .version(true) .query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery)))); if (Objects.nonNull(searchPointInTimeRequest.getSearchAfter())) { @@ -157,6 +159,7 @@ public CreateScrollResponse createScroll(final CreateScrollRequest createScrollR .scroll(Time.of(time -> time.time(createScrollRequest.getScrollTime()))) .sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc))))) .size(createScrollRequest.getSize()) + .version(true) .index(createScrollRequest.getIndex())), ObjectNode.class); } catch (final OpenSearchException e) { if (isDueToNoIndexFound(e)) { @@ -227,6 +230,7 @@ public SearchWithSearchAfterResults searchWithoutSearchContext(final NoSearchCon SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))), SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSort -> fieldSort.field("_id").order(SortOrder.Asc))))) ) + .version(true) .query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery)))); if (Objects.nonNull(noSearchContextSearchRequest.getSearchAfter())) { @@ -294,7 +298,10 @@ private List getDocumentsFromResponse(final SearchResponse se return searchResponse.hits().hits().stream() .map(hit -> JacksonEvent.builder() .withData(hit.source()) - .withEventMetadataAttributes(Map.of(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME, hit.id(), INDEX_METADATA_ATTRIBUTE_NAME, hit.index())) + .withEventMetadataAttributes( + Map.of(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME, hit.id(), + INDEX_METADATA_ATTRIBUTE_NAME, hit.index(), + DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME, hit.version())) .withEventType(EventType.DOCUMENT.toString()).build()) .collect(Collectors.toList()); } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/MetadataKeyAttributes.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/MetadataKeyAttributes.java index 68fbc4677b..420f9e403f 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/MetadataKeyAttributes.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/MetadataKeyAttributes.java @@ -8,4 +8,5 @@ public class MetadataKeyAttributes { public static final String DOCUMENT_ID_METADATA_ATTRIBUTE_NAME = "opensearch-document_id"; public static final String INDEX_METADATA_ATTRIBUTE_NAME = "opensearch-index"; + public static final String DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME = "opensearch_document_version"; } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessorTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessorTest.java index f3ba2f0956..405e8d16e0 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessorTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessorTest.java @@ -67,6 +67,7 @@ import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchAccessor.INDEX_NOT_FOUND_EXCEPTION; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchAccessor.PIT_RESOURCE_LIMIT_ERROR_TYPE; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchAccessor.SCROLL_RESOURCE_LIMIT_EXCEPTION_MESSAGE; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME; @ExtendWith(MockitoExtension.class) public class ElasticsearchAccessorTest { @@ -127,11 +128,13 @@ void create_scroll_returns_expected_create_scroll_response() throws IOException when(firstHit.id()).thenReturn(UUID.randomUUID().toString()); when(firstHit.index()).thenReturn(UUID.randomUUID().toString()); when(firstHit.source()).thenReturn(mock(ObjectNode.class)); + when(firstHit.version()).thenReturn(1L); final Hit secondHit = mock(Hit.class); when(secondHit.id()).thenReturn(UUID.randomUUID().toString()); when(secondHit.index()).thenReturn(UUID.randomUUID().toString()); when(secondHit.source()).thenReturn(mock(ObjectNode.class)); + when(secondHit.version()).thenReturn(2L); hits.add(firstHit); hits.add(secondHit); @@ -148,6 +151,14 @@ void create_scroll_returns_expected_create_scroll_response() throws IOException assertThat(createScrollResponse.getScrollId(), equalTo(scrollId)); assertThat(createScrollResponse.getDocuments(), notNullValue()); assertThat(createScrollResponse.getDocuments().size(), equalTo(2)); + assertThat(createScrollResponse.getDocuments().get(0), notNullValue()); + assertThat(createScrollResponse.getDocuments().get(0).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(1L)); + assertThat(createScrollResponse.getDocuments().get(1), notNullValue()); + assertThat(createScrollResponse.getDocuments().get(1).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(2L)); + + final SearchRequest searchRequest = searchRequestArgumentCaptor.getValue(); + assertThat(searchRequest, notNullValue()); + assertThat(searchRequest.version(), equalTo(true)); } @Test @@ -452,12 +463,14 @@ void search_with_pit_returns_expected_SearchPointInTimeResponse(final boolean ha when(firstHit.id()).thenReturn(UUID.randomUUID().toString()); when(firstHit.index()).thenReturn(UUID.randomUUID().toString()); when(firstHit.source()).thenReturn(mock(ObjectNode.class)); + when(firstHit.version()).thenReturn(1L); final Hit secondHit = mock(Hit.class); when(secondHit.id()).thenReturn(UUID.randomUUID().toString()); when(secondHit.index()).thenReturn(UUID.randomUUID().toString()); when(secondHit.source()).thenReturn(mock(ObjectNode.class)); when(secondHit.sort()).thenReturn(searchAfter); + when(secondHit.version()).thenReturn(2L); hits.add(firstHit); hits.add(secondHit); @@ -476,6 +489,14 @@ void search_with_pit_returns_expected_SearchPointInTimeResponse(final boolean ha assertThat(searchWithSearchAfterResults.getDocuments().size(), equalTo(2)); assertThat(searchWithSearchAfterResults.getNextSearchAfter(), equalTo(secondHit.sort())); + assertThat(searchWithSearchAfterResults.getDocuments().get(0), notNullValue()); + assertThat(searchWithSearchAfterResults.getDocuments().get(0).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(1L)); + assertThat(searchWithSearchAfterResults.getDocuments().get(1), notNullValue()); + assertThat(searchWithSearchAfterResults.getDocuments().get(1).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(2L)); + + final SearchRequest searchRequest = searchRequestArgumentCaptor.getValue(); + assertThat(searchRequest, notNullValue()); + assertThat(searchRequest.version(), equalTo(true)); } @ParameterizedTest @@ -502,12 +523,14 @@ void search_without_search_context_returns_expected_SearchPointInTimeResponse(fi when(firstHit.id()).thenReturn(UUID.randomUUID().toString()); when(firstHit.index()).thenReturn(index); when(firstHit.source()).thenReturn(mock(ObjectNode.class)); + when(firstHit.version()).thenReturn(1L); final Hit secondHit = mock(Hit.class); when(secondHit.id()).thenReturn(UUID.randomUUID().toString()); when(secondHit.index()).thenReturn(index); when(secondHit.source()).thenReturn(mock(ObjectNode.class)); when(secondHit.sort()).thenReturn(searchAfter); + when(secondHit.version()).thenReturn(2L); hits.add(firstHit); hits.add(secondHit); @@ -524,8 +547,16 @@ void search_without_search_context_returns_expected_SearchPointInTimeResponse(fi assertThat(searchWithSearchAfterResults, notNullValue()); assertThat(searchWithSearchAfterResults.getDocuments(), notNullValue()); assertThat(searchWithSearchAfterResults.getDocuments().size(), equalTo(2)); + assertThat(searchWithSearchAfterResults.getDocuments().get(0), notNullValue()); + assertThat(searchWithSearchAfterResults.getDocuments().get(0).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(1L)); + assertThat(searchWithSearchAfterResults.getDocuments().get(1), notNullValue()); + assertThat(searchWithSearchAfterResults.getDocuments().get(1).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(2L)); assertThat(searchWithSearchAfterResults.getNextSearchAfter(), equalTo(secondHit.sort())); + + final SearchRequest searchRequest = searchRequestArgumentCaptor.getValue(); + assertThat(searchRequest, notNullValue()); + assertThat(searchRequest.version(), equalTo(true)); } @Test @@ -544,11 +575,13 @@ void search_with_scroll_returns_expected_SearchScrollResponse() throws IOExcepti when(firstHit.id()).thenReturn(UUID.randomUUID().toString()); when(firstHit.index()).thenReturn(UUID.randomUUID().toString()); when(firstHit.source()).thenReturn(mock(ObjectNode.class)); + when(firstHit.version()).thenReturn(1L); final Hit secondHit = mock(Hit.class); when(secondHit.id()).thenReturn(UUID.randomUUID().toString()); when(secondHit.index()).thenReturn(UUID.randomUUID().toString()); when(secondHit.source()).thenReturn(mock(ObjectNode.class)); + when(secondHit.version()).thenReturn(2L); hits.add(firstHit); hits.add(secondHit); @@ -567,5 +600,9 @@ void search_with_scroll_returns_expected_SearchScrollResponse() throws IOExcepti assertThat(searchScrollResponse.getDocuments(), notNullValue()); assertThat(searchScrollResponse.getDocuments().size(), equalTo(2)); assertThat(searchScrollResponse.getScrollId(), equalTo(scrollId)); + assertThat(searchScrollResponse.getDocuments().get(0), notNullValue()); + assertThat(searchScrollResponse.getDocuments().get(0).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(1L)); + assertThat(searchScrollResponse.getDocuments().get(1), notNullValue()); + assertThat(searchScrollResponse.getDocuments().get(1).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(2L)); } } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java index 8555c91432..663fbaf181 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java @@ -65,6 +65,7 @@ import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchAccessor.INDEX_NOT_FOUND_EXCEPTION; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchAccessor.PIT_RESOURCE_LIMIT_ERROR_TYPE; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchAccessor.SCROLL_RESOURCE_LIMIT_EXCEPTION_MESSAGE; +import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME; @ExtendWith(MockitoExtension.class) public class OpenSearchAccessorTest { @@ -148,6 +149,10 @@ void create_scroll_returns_expected_create_scroll_response() throws IOException assertThat(createScrollResponse.getScrollId(), equalTo(scrollId)); assertThat(createScrollResponse.getDocuments(), notNullValue()); assertThat(createScrollResponse.getDocuments().size(), equalTo(2)); + + final SearchRequest searchRequest = searchRequestArgumentCaptor.getValue(); + assertThat(searchRequest, notNullValue()); + assertThat(searchRequest.version(), equalTo(true)); } @Test @@ -454,11 +459,13 @@ void search_with_pit_returns_expected_SearchPointInTimeResponse(final boolean ha when(firstHit.id()).thenReturn(UUID.randomUUID().toString()); when(firstHit.index()).thenReturn(UUID.randomUUID().toString()); when(firstHit.source()).thenReturn(mock(ObjectNode.class)); + when(firstHit.version()).thenReturn(1L); final Hit secondHit = mock(Hit.class); when(secondHit.id()).thenReturn(UUID.randomUUID().toString()); when(secondHit.index()).thenReturn(UUID.randomUUID().toString()); when(secondHit.source()).thenReturn(mock(ObjectNode.class)); + when(secondHit.version()).thenReturn(2L); when(secondHit.sort()).thenReturn(Collections.singletonList(UUID.randomUUID().toString())); hits.add(firstHit); @@ -476,8 +483,16 @@ void search_with_pit_returns_expected_SearchPointInTimeResponse(final boolean ha assertThat(searchWithSearchAfterResults, notNullValue()); assertThat(searchWithSearchAfterResults.getDocuments(), notNullValue()); assertThat(searchWithSearchAfterResults.getDocuments().size(), equalTo(2)); + assertThat(searchWithSearchAfterResults.getDocuments().get(0), notNullValue()); + assertThat(searchWithSearchAfterResults.getDocuments().get(0).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(1L)); + assertThat(searchWithSearchAfterResults.getDocuments().get(1), notNullValue()); + assertThat(searchWithSearchAfterResults.getDocuments().get(1).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(2L)); assertThat(searchWithSearchAfterResults.getNextSearchAfter(), equalTo(secondHit.sort())); + + final SearchRequest searchRequest = searchRequestArgumentCaptor.getValue(); + assertThat(searchRequest, notNullValue()); + assertThat(searchRequest.version(), equalTo(true)); } @Test @@ -496,11 +511,13 @@ void search_with_scroll_returns_expected_SearchScrollResponse() throws IOExcepti when(firstHit.id()).thenReturn(UUID.randomUUID().toString()); when(firstHit.index()).thenReturn(UUID.randomUUID().toString()); when(firstHit.source()).thenReturn(mock(ObjectNode.class)); + when(firstHit.version()).thenReturn(1L); final Hit secondHit = mock(Hit.class); when(secondHit.id()).thenReturn(UUID.randomUUID().toString()); when(secondHit.index()).thenReturn(UUID.randomUUID().toString()); when(secondHit.source()).thenReturn(mock(ObjectNode.class)); + when(secondHit.version()).thenReturn(2L); hits.add(firstHit); hits.add(secondHit); @@ -519,5 +536,9 @@ void search_with_scroll_returns_expected_SearchScrollResponse() throws IOExcepti assertThat(searchScrollResponse.getDocuments(), notNullValue()); assertThat(searchScrollResponse.getDocuments().size(), equalTo(2)); assertThat(searchScrollResponse.getScrollId(), equalTo(scrollId)); + assertThat(searchScrollResponse.getDocuments().get(0), notNullValue()); + assertThat(searchScrollResponse.getDocuments().get(0).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(1L)); + assertThat(searchScrollResponse.getDocuments().get(1), notNullValue()); + assertThat(searchScrollResponse.getDocuments().get(1).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(2L)); } }