Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertTrue;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType;

import java.util.List;

public class SearchConfiguration {

@JsonProperty("search_context_type")
Expand All @@ -18,6 +21,10 @@ public class SearchConfiguration {
@JsonProperty("batch_size")
private Integer batchSize = 1000;

@JsonProperty("sort")
@Valid
private List<SortConfig> sort;

@JsonIgnore
private SearchContextType searchContextTypeValue;

Expand All @@ -29,6 +36,10 @@ public Integer getBatchSize() {
return batchSize;
}

public List<SortConfig> getSort() {
return sort;
}

@AssertTrue(message = "search_context_type must be one of [ 'scroll', 'point_in_time', 'none' ]")
boolean isSearchContextTypeValid() {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.opensearch.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotBlank;

public class SortConfig {

@JsonProperty("name")
@NotBlank(message = "sort field name must not be blank")
private String name;

@JsonProperty("order")
private SortOrder order = SortOrder.ASCENDING;

public String getName() {
return name;
}

public SortOrder getOrder() {
return order;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.opensearch.configuration;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

public enum SortOrder {
ASCENDING("ascending", "asc"),
DESCENDING("descending", "desc");

private static final Map<String, SortOrder> NAMES_MAP = Arrays.stream(SortOrder.values())
.collect(Collectors.toMap(
value -> value.optionName,
value -> value
));

private final String optionName;
private final String sortOrderValue;

SortOrder(final String optionName, final String sortOrderValue) {
this.optionName = optionName;
this.sortOrderValue = sortOrderValue;
}

@JsonValue
public String getOptionName() {
return optionName;
}

public String getSortOrderValue() {
return sortOrderValue;
}

@JsonCreator
public static SortOrder fromOptionName(final String optionName) {
return NAMES_MAP.get(optionName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.IndexNotFoundException;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.NoSearchContextSearchRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SortingOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -147,13 +148,14 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op

final SearchConfiguration searchConfiguration = openSearchSourceConfiguration.getSearchConfiguration();
SearchWithSearchAfterResults searchWithSearchAfterResults = null;
final List<SortingOptions> sortingOptions = SortingOptions.fromSortConfigs(searchConfiguration.getSort());

// todo: Pass query and sort options from SearchConfiguration to the search request
do {
searchWithSearchAfterResults = searchAccessor.searchWithoutSearchContext(NoSearchContextSearchRequest.builder()
.withIndex(indexName)
.withPaginationSize(searchConfiguration.getBatchSize())
.withSearchAfter(getSearchAfter(openSearchIndexProgressState, searchWithSearchAfterResults))
.withSortOptions(sortingOptions)
.build());

searchWithSearchAfterResults.getDocuments().stream().map(Record::new).forEach(record -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SortingOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -183,14 +184,15 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op

final SearchConfiguration searchConfiguration = openSearchSourceConfiguration.getSearchConfiguration();
SearchWithSearchAfterResults searchWithSearchAfterResults = null;
final List<SortingOptions> sortingOptions = SortingOptions.fromSortConfigs(searchConfiguration.getSort());

// todo: Pass query and sort options from SearchConfiguration to the search request
do {
searchWithSearchAfterResults = searchAccessor.searchWithPit(SearchPointInTimeRequest.builder()
.withPitId(openSearchIndexProgressState.getPitId())
.withKeepAlive(EXTEND_KEEP_ALIVE_TIME)
.withPaginationSize(searchConfiguration.getBatchSize())
.withSearchAfter(getSearchAfter(openSearchIndexProgressState, searchWithSearchAfterResults))
.withSortOptions(sortingOptions)
.build());

searchWithSearchAfterResults.getDocuments().stream().map(Record::new).forEach(record -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeleteScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SortingOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -155,11 +156,13 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
LOG.info("Started processing for index: '{}'", indexName);

final Integer batchSize = openSearchSourceConfiguration.getSearchConfiguration().getBatchSize();
final List<SortingOptions> sortingOptions = SortingOptions.fromSortConfigs(openSearchSourceConfiguration.getSearchConfiguration().getSort());

final CreateScrollResponse createScrollResponse = searchAccessor.createScroll(CreateScrollRequest.builder()
.withScrollTime(SCROLL_TIME_PER_BATCH)
.withSize(openSearchSourceConfiguration.getSearchConfiguration().getBatchSize())
.withIndex(indexName)
.withSortOptions(sortingOptions)
.build());

writeDocumentsToBuffer(createScrollResponse.getDocuments(), acknowledgementSet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SortingOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -65,6 +66,13 @@ public class ElasticsearchAccessor implements SearchAccessor, ClusterClientFacto
static final String PIT_RESOURCE_LIMIT_ERROR_TYPE = "rejected_execution_exception";
static final String INDEX_NOT_FOUND_EXCEPTION = "index_not_found_exception";

private static final List<SortOptions> DEFAULT_SORT_OPTIONS = List.of(
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSort -> fieldSort.field("_id").order(SortOrder.Asc)))));

private static final List<SortOptions> DEFAULT_SCROLL_SORT_OPTIONS = List.of(
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))));

private final PluginComponentRefresher<ElasticsearchClient, OpenSearchSourceConfiguration>
elasticsearchClientRefresher;
private final SearchContextType searchContextType;
Expand Down Expand Up @@ -119,10 +127,7 @@ public SearchWithSearchAfterResults searchWithPit(final SearchPointInTimeRequest
.id(searchPointInTimeRequest.getPitId())
.keepAlive(Time.of(time -> time.time(searchPointInTimeRequest.getKeepAlive())))))
.size(searchPointInTimeRequest.getPaginationSize())
.sort(List.of(
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSortBuilder -> fieldSortBuilder.field("_id").order(SortOrder.Asc)))))
)
.sort(buildSortOptions(searchPointInTimeRequest.getSortOptions()))
.version(true)
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));

Expand Down Expand Up @@ -161,7 +166,7 @@ public CreateScrollResponse createScroll(final CreateScrollRequest createScrollR
searchResponse = elasticsearchClientRefresher.get()
.search(SearchRequest.of(request -> request
.scroll(Time.of(time -> time.time(createScrollRequest.getScrollTime())))
.sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))))
.sort(buildSortOptionsForScroll(createScrollRequest.getSortOptions()))
.size(createScrollRequest.getSize())
.version(true)
.index(createScrollRequest.getIndex())), ObjectNode.class);
Expand Down Expand Up @@ -232,10 +237,7 @@ public SearchWithSearchAfterResults searchWithoutSearchContext(final NoSearchCon
builder
.index(noSearchContextSearchRequest.getIndex())
.size(noSearchContextSearchRequest.getPaginationSize())
.sort(List.of(
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSortBuilder -> fieldSortBuilder.field("_id").order(SortOrder.Asc)))))
)
.sort(buildSortOptions(noSearchContextSearchRequest.getSortOptions()))
.version(true)
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));

Expand Down Expand Up @@ -306,4 +308,22 @@ private List<Event> getDocumentsFromResponse(final SearchResponse<ObjectNode> se
.withEventType(EventType.DOCUMENT.toString()).build())
.collect(Collectors.toList());
}

private List<SortOptions> buildSortOptions(final List<SortingOptions> sortingOptions) {
if (sortingOptions == null || sortingOptions.isEmpty()) {
return DEFAULT_SORT_OPTIONS;
}
return sortingOptions.stream()
.map(opt -> SortOptions.of(b -> b.field(
FieldSort.of(f -> f.field(opt.getFieldName())
.order("desc".equalsIgnoreCase(opt.getOrder()) ? SortOrder.Desc : SortOrder.Asc)))))
.collect(Collectors.toList());
}

private List<SortOptions> buildSortOptionsForScroll(final List<SortingOptions> sortingOptions) {
if (sortingOptions == null || sortingOptions.isEmpty()) {
return DEFAULT_SCROLL_SORT_OPTIONS;
}
return buildSortOptions(sortingOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SortingOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -66,6 +67,13 @@ public class OpenSearchAccessor implements SearchAccessor, ClusterClientFactory<
static final String INDEX_NOT_FOUND_EXCEPTION = "index_not_found_exception";
static final String SCROLL_RESOURCE_LIMIT_EXCEPTION_MESSAGE = "Trying to create too many scroll contexts";

private static final List<SortOptions> DEFAULT_SORT_OPTIONS = List.of(
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSort -> fieldSort.field("_id").order(SortOrder.Asc)))));

private static final List<SortOptions> DEFAULT_SCROLL_SORT_OPTIONS = List.of(
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))));

private final PluginComponentRefresher<OpenSearchClient, OpenSearchSourceConfiguration> clientRefresher;
private final SearchContextType searchContextType;

Expand Down Expand Up @@ -117,10 +125,7 @@ public SearchWithSearchAfterResults searchWithPit(final SearchPointInTimeRequest
builder
.pit(Pit.of(pitBuilder -> pitBuilder.id(searchPointInTimeRequest.getPitId()).keepAlive(searchPointInTimeRequest.getKeepAlive())))
.size(searchPointInTimeRequest.getPaginationSize())
.sort(List.of(
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSort -> fieldSort.field("_id").order(SortOrder.Asc)))))
)
.sort(buildSortOptions(searchPointInTimeRequest.getSortOptions()))
.version(true)
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));

Expand Down Expand Up @@ -157,7 +162,7 @@ public CreateScrollResponse createScroll(final CreateScrollRequest createScrollR
searchResponse = clientRefresher.get()
.search(SearchRequest.of(request -> request
.scroll(Time.of(time -> time.time(createScrollRequest.getScrollTime())))
.sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))))
.sort(buildSortOptionsForScroll(createScrollRequest.getSortOptions()))
.size(createScrollRequest.getSize())
.version(true)
.index(createScrollRequest.getIndex())), ObjectNode.class);
Expand Down Expand Up @@ -226,10 +231,7 @@ public SearchWithSearchAfterResults searchWithoutSearchContext(final NoSearchCon
builder
.index(noSearchContextSearchRequest.getIndex())
.size(noSearchContextSearchRequest.getPaginationSize())
.sort(List.of(
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSort -> fieldSort.field("_id").order(SortOrder.Asc)))))
)
.sort(buildSortOptions(noSearchContextSearchRequest.getSortOptions()))
.version(true)
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));

Expand Down Expand Up @@ -305,4 +307,22 @@ private List<Event> getDocumentsFromResponse(final SearchResponse<ObjectNode> se
.withEventType(EventType.DOCUMENT.toString()).build())
.collect(Collectors.toList());
}

private List<SortOptions> buildSortOptions(final List<SortingOptions> sortingOptions) {
if (sortingOptions == null || sortingOptions.isEmpty()) {
return DEFAULT_SORT_OPTIONS;
}
return sortingOptions.stream()
.map(opt -> SortOptions.of(b -> b.field(
FieldSort.of(f -> f.field(opt.getFieldName())
.order("desc".equalsIgnoreCase(opt.getOrder()) ? SortOrder.Desc : SortOrder.Asc)))))
.collect(Collectors.toList());
}

private List<SortOptions> buildSortOptionsForScroll(final List<SortingOptions> sortingOptions) {
if (sortingOptions == null || sortingOptions.isEmpty()) {
return DEFAULT_SCROLL_SORT_OPTIONS;
}
return buildSortOptions(sortingOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
*/
package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model;

import java.util.List;

public class CreateScrollRequest {

private final String index;
private final String scrollTime;
private final Integer size;
private final List<SortingOptions> sortingOptions;

public String getIndex() {
return index;
Expand All @@ -18,10 +21,13 @@ public String getIndex() {

public String getScrollTime() { return scrollTime; }

public List<SortingOptions> getSortOptions() { return sortingOptions; }

private CreateScrollRequest(final CreateScrollRequest.Builder builder) {
this.index = builder.index;
this.size = builder.size;
this.scrollTime = builder.scrollTime;
this.sortingOptions = builder.sortingOptions;
}

public static CreateScrollRequest.Builder builder() {
Expand All @@ -33,6 +39,7 @@ public static class Builder {
private String index;
private Integer size;
private String scrollTime;
private List<SortingOptions> sortingOptions;

public Builder() {

Expand All @@ -53,6 +60,11 @@ public CreateScrollRequest.Builder withScrollTime(final String scrollTime) {
return this;
}

public CreateScrollRequest.Builder withSortOptions(final List<SortingOptions> sortingOptions) {
this.sortingOptions = sortingOptions;
return this;
}

public CreateScrollRequest build() {
return new CreateScrollRequest(this);
}
Expand Down
Loading
Loading