Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
}
} while (searchWithSearchAfterResults.getDocuments().size() == searchConfiguration.getBatchSize());

LOG.info("Received {} documents in latest search request, and batch size is {}, exiting pagination",
searchWithSearchAfterResults.getDocuments().size(), searchConfiguration.getBatchSize());

try {
bufferAccumulator.flush();
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
}
} while (searchWithSearchAfterResults.getDocuments().size() == searchConfiguration.getBatchSize());

LOG.info("Received {} documents in latest search request, and batch size is {}, exiting pagination",
searchWithSearchAfterResults.getDocuments().size(), searchConfiguration.getBatchSize());

try {
bufferAccumulator.flush();
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
} while (searchScrollResponse.getDocuments().size() == batchSize);
}

LOG.info("Received {} documents in latest search request, and batch size is {}, exiting pagination",
searchScrollResponse.getDocuments().size(), batchSize);

deleteScroll(createScrollResponse.getScrollId());

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchAccessor.SCROLL_RESOURCE_LIMIT_EXCEPTION_MESSAGE;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_ID_METADATA_ATTRIBUTE_NAME;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.INDEX_METADATA_ATTRIBUTE_NAME;

public class ElasticsearchAccessor implements SearchAccessor, ClusterClientFactory<ElasticsearchClient> {
Expand Down Expand Up @@ -122,6 +123,7 @@ public SearchWithSearchAfterResults searchWithPit(final SearchPointInTimeRequest
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSortBuilder -> fieldSortBuilder.field("_id").order(SortOrder.Asc)))))
)
.version(true)
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));

if (Objects.nonNull(searchPointInTimeRequest.getSearchAfter())) {
Expand Down Expand Up @@ -161,6 +163,7 @@ public CreateScrollResponse createScroll(final CreateScrollRequest createScrollR
.scroll(Time.of(time -> time.time(createScrollRequest.getScrollTime())))
.sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))))
.size(createScrollRequest.getSize())
.version(true)
.index(createScrollRequest.getIndex())), ObjectNode.class);
} catch (final ElasticsearchException e) {
if (isDueToNoIndexFound(e)) {
Expand Down Expand Up @@ -233,6 +236,7 @@ public SearchWithSearchAfterResults searchWithoutSearchContext(final NoSearchCon
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSortBuilder -> fieldSortBuilder.field("_id").order(SortOrder.Asc)))))
)
.version(true)
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));

if (Objects.nonNull(noSearchContextSearchRequest.getSearchAfter())) {
Expand Down Expand Up @@ -295,7 +299,10 @@ private List<Event> getDocumentsFromResponse(final SearchResponse<ObjectNode> se
return searchResponse.hits().hits().stream()
.map(hit -> JacksonEvent.builder()
.withData(hit.source())
.withEventMetadataAttributes(Map.of(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME, hit.id(), INDEX_METADATA_ATTRIBUTE_NAME, hit.index()))
.withEventMetadataAttributes(
Map.of(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME, hit.id(),
INDEX_METADATA_ATTRIBUTE_NAME, hit.index(),
DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME, hit.version()))
.withEventType(EventType.DOCUMENT.toString()).build())
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_ID_METADATA_ATTRIBUTE_NAME;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.INDEX_METADATA_ATTRIBUTE_NAME;

public class OpenSearchAccessor implements SearchAccessor, ClusterClientFactory<OpenSearchClient> {
Expand Down Expand Up @@ -120,6 +121,7 @@ public SearchWithSearchAfterResults searchWithPit(final SearchPointInTimeRequest
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSort -> fieldSort.field("_id").order(SortOrder.Asc)))))
)
.version(true)
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));

if (Objects.nonNull(searchPointInTimeRequest.getSearchAfter())) {
Expand Down Expand Up @@ -157,6 +159,7 @@ public CreateScrollResponse createScroll(final CreateScrollRequest createScrollR
.scroll(Time.of(time -> time.time(createScrollRequest.getScrollTime())))
.sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))))
.size(createScrollRequest.getSize())
.version(true)
.index(createScrollRequest.getIndex())), ObjectNode.class);
} catch (final OpenSearchException e) {
if (isDueToNoIndexFound(e)) {
Expand Down Expand Up @@ -227,6 +230,7 @@ public SearchWithSearchAfterResults searchWithoutSearchContext(final NoSearchCon
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSort -> fieldSort.field("_id").order(SortOrder.Asc)))))
)
.version(true)
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));

if (Objects.nonNull(noSearchContextSearchRequest.getSearchAfter())) {
Expand Down Expand Up @@ -294,7 +298,10 @@ private List<Event> getDocumentsFromResponse(final SearchResponse<ObjectNode> se
return searchResponse.hits().hits().stream()
.map(hit -> JacksonEvent.builder()
.withData(hit.source())
.withEventMetadataAttributes(Map.of(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME, hit.id(), INDEX_METADATA_ATTRIBUTE_NAME, hit.index()))
.withEventMetadataAttributes(
Map.of(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME, hit.id(),
INDEX_METADATA_ATTRIBUTE_NAME, hit.index(),
DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME, hit.version()))
.withEventType(EventType.DOCUMENT.toString()).build())
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@
public class MetadataKeyAttributes {
public static final String DOCUMENT_ID_METADATA_ATTRIBUTE_NAME = "opensearch-document_id";
public static final String INDEX_METADATA_ATTRIBUTE_NAME = "opensearch-index";
public static final String DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME = "opensearch_document_version";
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchAccessor.INDEX_NOT_FOUND_EXCEPTION;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchAccessor.PIT_RESOURCE_LIMIT_ERROR_TYPE;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchAccessor.SCROLL_RESOURCE_LIMIT_EXCEPTION_MESSAGE;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME;

@ExtendWith(MockitoExtension.class)
public class ElasticsearchAccessorTest {
Expand Down Expand Up @@ -127,11 +128,13 @@ void create_scroll_returns_expected_create_scroll_response() throws IOException
when(firstHit.id()).thenReturn(UUID.randomUUID().toString());
when(firstHit.index()).thenReturn(UUID.randomUUID().toString());
when(firstHit.source()).thenReturn(mock(ObjectNode.class));
when(firstHit.version()).thenReturn(1L);

final Hit<ObjectNode> secondHit = mock(Hit.class);
when(secondHit.id()).thenReturn(UUID.randomUUID().toString());
when(secondHit.index()).thenReturn(UUID.randomUUID().toString());
when(secondHit.source()).thenReturn(mock(ObjectNode.class));
when(secondHit.version()).thenReturn(2L);

hits.add(firstHit);
hits.add(secondHit);
Expand All @@ -148,6 +151,14 @@ void create_scroll_returns_expected_create_scroll_response() throws IOException
assertThat(createScrollResponse.getScrollId(), equalTo(scrollId));
assertThat(createScrollResponse.getDocuments(), notNullValue());
assertThat(createScrollResponse.getDocuments().size(), equalTo(2));
assertThat(createScrollResponse.getDocuments().get(0), notNullValue());
assertThat(createScrollResponse.getDocuments().get(0).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(1L));
assertThat(createScrollResponse.getDocuments().get(1), notNullValue());
assertThat(createScrollResponse.getDocuments().get(1).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(2L));

final SearchRequest searchRequest = searchRequestArgumentCaptor.getValue();
assertThat(searchRequest, notNullValue());
assertThat(searchRequest.version(), equalTo(true));
}

@Test
Expand Down Expand Up @@ -452,12 +463,14 @@ void search_with_pit_returns_expected_SearchPointInTimeResponse(final boolean ha
when(firstHit.id()).thenReturn(UUID.randomUUID().toString());
when(firstHit.index()).thenReturn(UUID.randomUUID().toString());
when(firstHit.source()).thenReturn(mock(ObjectNode.class));
when(firstHit.version()).thenReturn(1L);

final Hit<ObjectNode> secondHit = mock(Hit.class);
when(secondHit.id()).thenReturn(UUID.randomUUID().toString());
when(secondHit.index()).thenReturn(UUID.randomUUID().toString());
when(secondHit.source()).thenReturn(mock(ObjectNode.class));
when(secondHit.sort()).thenReturn(searchAfter);
when(secondHit.version()).thenReturn(2L);

hits.add(firstHit);
hits.add(secondHit);
Expand All @@ -476,6 +489,14 @@ void search_with_pit_returns_expected_SearchPointInTimeResponse(final boolean ha
assertThat(searchWithSearchAfterResults.getDocuments().size(), equalTo(2));

assertThat(searchWithSearchAfterResults.getNextSearchAfter(), equalTo(secondHit.sort()));
assertThat(searchWithSearchAfterResults.getDocuments().get(0), notNullValue());
assertThat(searchWithSearchAfterResults.getDocuments().get(0).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(1L));
assertThat(searchWithSearchAfterResults.getDocuments().get(1), notNullValue());
assertThat(searchWithSearchAfterResults.getDocuments().get(1).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(2L));

final SearchRequest searchRequest = searchRequestArgumentCaptor.getValue();
assertThat(searchRequest, notNullValue());
assertThat(searchRequest.version(), equalTo(true));
}

@ParameterizedTest
Expand All @@ -502,12 +523,14 @@ void search_without_search_context_returns_expected_SearchPointInTimeResponse(fi
when(firstHit.id()).thenReturn(UUID.randomUUID().toString());
when(firstHit.index()).thenReturn(index);
when(firstHit.source()).thenReturn(mock(ObjectNode.class));
when(firstHit.version()).thenReturn(1L);

final Hit<ObjectNode> secondHit = mock(Hit.class);
when(secondHit.id()).thenReturn(UUID.randomUUID().toString());
when(secondHit.index()).thenReturn(index);
when(secondHit.source()).thenReturn(mock(ObjectNode.class));
when(secondHit.sort()).thenReturn(searchAfter);
when(secondHit.version()).thenReturn(2L);

hits.add(firstHit);
hits.add(secondHit);
Expand All @@ -524,8 +547,16 @@ void search_without_search_context_returns_expected_SearchPointInTimeResponse(fi
assertThat(searchWithSearchAfterResults, notNullValue());
assertThat(searchWithSearchAfterResults.getDocuments(), notNullValue());
assertThat(searchWithSearchAfterResults.getDocuments().size(), equalTo(2));
assertThat(searchWithSearchAfterResults.getDocuments().get(0), notNullValue());
assertThat(searchWithSearchAfterResults.getDocuments().get(0).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(1L));
assertThat(searchWithSearchAfterResults.getDocuments().get(1), notNullValue());
assertThat(searchWithSearchAfterResults.getDocuments().get(1).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(2L));

assertThat(searchWithSearchAfterResults.getNextSearchAfter(), equalTo(secondHit.sort()));

final SearchRequest searchRequest = searchRequestArgumentCaptor.getValue();
assertThat(searchRequest, notNullValue());
assertThat(searchRequest.version(), equalTo(true));
}

@Test
Expand All @@ -544,11 +575,13 @@ void search_with_scroll_returns_expected_SearchScrollResponse() throws IOExcepti
when(firstHit.id()).thenReturn(UUID.randomUUID().toString());
when(firstHit.index()).thenReturn(UUID.randomUUID().toString());
when(firstHit.source()).thenReturn(mock(ObjectNode.class));
when(firstHit.version()).thenReturn(1L);

final Hit<ObjectNode> secondHit = mock(Hit.class);
when(secondHit.id()).thenReturn(UUID.randomUUID().toString());
when(secondHit.index()).thenReturn(UUID.randomUUID().toString());
when(secondHit.source()).thenReturn(mock(ObjectNode.class));
when(secondHit.version()).thenReturn(2L);

hits.add(firstHit);
hits.add(secondHit);
Expand All @@ -567,5 +600,9 @@ void search_with_scroll_returns_expected_SearchScrollResponse() throws IOExcepti
assertThat(searchScrollResponse.getDocuments(), notNullValue());
assertThat(searchScrollResponse.getDocuments().size(), equalTo(2));
assertThat(searchScrollResponse.getScrollId(), equalTo(scrollId));
assertThat(searchScrollResponse.getDocuments().get(0), notNullValue());
assertThat(searchScrollResponse.getDocuments().get(0).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(1L));
assertThat(searchScrollResponse.getDocuments().get(1), notNullValue());
assertThat(searchScrollResponse.getDocuments().get(1).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(2L));
}
}
Loading
Loading