diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfiguration.java index aeec292c84..be10c5b7d6 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfiguration.java @@ -7,9 +7,12 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; import jakarta.validation.constraints.AssertTrue; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType; +import java.util.List; + public class SearchConfiguration { @JsonProperty("search_context_type") @@ -18,6 +21,10 @@ public class SearchConfiguration { @JsonProperty("batch_size") private Integer batchSize = 1000; + @JsonProperty("sort") + @Valid + private List sort; + @JsonIgnore private SearchContextType searchContextTypeValue; @@ -29,6 +36,10 @@ public Integer getBatchSize() { return batchSize; } + public List getSort() { + return sort; + } + @AssertTrue(message = "search_context_type must be one of [ 'scroll', 'point_in_time', 'none' ]") boolean isSearchContextTypeValid() { try { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SortConfig.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SortConfig.java new file mode 100644 index 0000000000..f8ed95f9cf --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SortConfig.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotBlank; + +public class SortConfig { + + @JsonProperty("name") + @NotBlank(message = "sort field name must not be blank") + private String name; + + @JsonProperty("order") + private SortOrder order = SortOrder.ASCENDING; + + public String getName() { + return name; + } + + public SortOrder getOrder() { + return order; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SortOrder.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SortOrder.java new file mode 100644 index 0000000000..9c1c9b0c77 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SortOrder.java @@ -0,0 +1,51 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.configuration; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +public enum SortOrder { + ASCENDING("ascending", "asc"), + DESCENDING("descending", "desc"); + + private static final Map NAMES_MAP = Arrays.stream(SortOrder.values()) + .collect(Collectors.toMap( + value -> value.optionName, + value -> value + )); + + private final String optionName; + private final String sortOrderValue; + + SortOrder(final String optionName, final String sortOrderValue) { + this.optionName = optionName; + this.sortOrderValue = sortOrderValue; + } + + @JsonValue + public String getOptionName() { + return optionName; + } + + public String getSortOrderValue() { + return sortOrderValue; + } + + @JsonCreator + public static SortOrder fromOptionName(final String optionName) { + return NAMES_MAP.get(optionName); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java index 651fd4edac..090293691f 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java @@ -24,6 +24,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.IndexNotFoundException; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.NoSearchContextSearchRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SortingOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -147,13 +148,14 @@ private void processIndex(final SourcePartition op final SearchConfiguration searchConfiguration = openSearchSourceConfiguration.getSearchConfiguration(); SearchWithSearchAfterResults searchWithSearchAfterResults = null; + final List sortingOptions = SortingOptions.fromSortConfigs(searchConfiguration.getSort()); - // todo: Pass query and sort options from SearchConfiguration to the search request do { searchWithSearchAfterResults = searchAccessor.searchWithoutSearchContext(NoSearchContextSearchRequest.builder() .withIndex(indexName) .withPaginationSize(searchConfiguration.getBatchSize()) .withSearchAfter(getSearchAfter(openSearchIndexProgressState, searchWithSearchAfterResults)) + .withSortOptions(sortingOptions) .build()); searchWithSearchAfterResults.getDocuments().stream().map(Record::new).forEach(record -> { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java index 9e54b4cbcc..f30694487f 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java @@ -27,6 +27,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SortingOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -183,14 +184,15 @@ private void processIndex(final SourcePartition op final SearchConfiguration searchConfiguration = openSearchSourceConfiguration.getSearchConfiguration(); SearchWithSearchAfterResults searchWithSearchAfterResults = null; + final List sortingOptions = SortingOptions.fromSortConfigs(searchConfiguration.getSort()); - // todo: Pass query and sort options from SearchConfiguration to the search request do { searchWithSearchAfterResults = searchAccessor.searchWithPit(SearchPointInTimeRequest.builder() .withPitId(openSearchIndexProgressState.getPitId()) .withKeepAlive(EXTEND_KEEP_ALIVE_TIME) .withPaginationSize(searchConfiguration.getBatchSize()) .withSearchAfter(getSearchAfter(openSearchIndexProgressState, searchWithSearchAfterResults)) + .withSortOptions(sortingOptions) .build()); searchWithSearchAfterResults.getDocuments().stream().map(Record::new).forEach(record -> { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java index 3153c308db..b303013f09 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java @@ -26,6 +26,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeleteScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SortingOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -155,11 +156,13 @@ private void processIndex(final SourcePartition op LOG.info("Started processing for index: '{}'", indexName); final Integer batchSize = openSearchSourceConfiguration.getSearchConfiguration().getBatchSize(); + final List sortingOptions = SortingOptions.fromSortConfigs(openSearchSourceConfiguration.getSearchConfiguration().getSort()); final CreateScrollResponse createScrollResponse = searchAccessor.createScroll(CreateScrollRequest.builder() .withScrollTime(SCROLL_TIME_PER_BATCH) .withSize(openSearchSourceConfiguration.getSearchConfiguration().getBatchSize()) .withIndex(indexName) + .withSortOptions(sortingOptions) .build()); writeDocumentsToBuffer(createScrollResponse.getDocuments(), acknowledgementSet); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java index 5a6fc23db7..939359d953 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java @@ -43,6 +43,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SortingOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +66,13 @@ public class ElasticsearchAccessor implements SearchAccessor, ClusterClientFacto static final String PIT_RESOURCE_LIMIT_ERROR_TYPE = "rejected_execution_exception"; static final String INDEX_NOT_FOUND_EXCEPTION = "index_not_found_exception"; + private static final List DEFAULT_SORT_OPTIONS = List.of( + SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))), + SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSort -> fieldSort.field("_id").order(SortOrder.Asc))))); + + private static final List DEFAULT_SCROLL_SORT_OPTIONS = List.of( + SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc))))); + private final PluginComponentRefresher elasticsearchClientRefresher; private final SearchContextType searchContextType; @@ -119,10 +127,7 @@ public SearchWithSearchAfterResults searchWithPit(final SearchPointInTimeRequest .id(searchPointInTimeRequest.getPitId()) .keepAlive(Time.of(time -> time.time(searchPointInTimeRequest.getKeepAlive()))))) .size(searchPointInTimeRequest.getPaginationSize()) - .sort(List.of( - SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))), - SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSortBuilder -> fieldSortBuilder.field("_id").order(SortOrder.Asc))))) - ) + .sort(buildSortOptions(searchPointInTimeRequest.getSortOptions())) .version(true) .query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery)))); @@ -161,7 +166,7 @@ public CreateScrollResponse createScroll(final CreateScrollRequest createScrollR searchResponse = elasticsearchClientRefresher.get() .search(SearchRequest.of(request -> request .scroll(Time.of(time -> time.time(createScrollRequest.getScrollTime()))) - .sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc))))) + .sort(buildSortOptionsForScroll(createScrollRequest.getSortOptions())) .size(createScrollRequest.getSize()) .version(true) .index(createScrollRequest.getIndex())), ObjectNode.class); @@ -232,10 +237,7 @@ public SearchWithSearchAfterResults searchWithoutSearchContext(final NoSearchCon builder .index(noSearchContextSearchRequest.getIndex()) .size(noSearchContextSearchRequest.getPaginationSize()) - .sort(List.of( - SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))), - SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSortBuilder -> fieldSortBuilder.field("_id").order(SortOrder.Asc))))) - ) + .sort(buildSortOptions(noSearchContextSearchRequest.getSortOptions())) .version(true) .query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery)))); @@ -306,4 +308,22 @@ private List getDocumentsFromResponse(final SearchResponse se .withEventType(EventType.DOCUMENT.toString()).build()) .collect(Collectors.toList()); } + + private List buildSortOptions(final List sortingOptions) { + if (sortingOptions == null || sortingOptions.isEmpty()) { + return DEFAULT_SORT_OPTIONS; + } + return sortingOptions.stream() + .map(opt -> SortOptions.of(b -> b.field( + FieldSort.of(f -> f.field(opt.getFieldName()) + .order("desc".equalsIgnoreCase(opt.getOrder()) ? SortOrder.Desc : SortOrder.Asc))))) + .collect(Collectors.toList()); + } + + private List buildSortOptionsForScroll(final List sortingOptions) { + if (sortingOptions == null || sortingOptions.isEmpty()) { + return DEFAULT_SCROLL_SORT_OPTIONS; + } + return buildSortOptions(sortingOptions); + } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java index bc76b4de12..ae60b73657 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java @@ -43,6 +43,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SortingOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +67,13 @@ public class OpenSearchAccessor implements SearchAccessor, ClusterClientFactory< static final String INDEX_NOT_FOUND_EXCEPTION = "index_not_found_exception"; static final String SCROLL_RESOURCE_LIMIT_EXCEPTION_MESSAGE = "Trying to create too many scroll contexts"; + private static final List DEFAULT_SORT_OPTIONS = List.of( + SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))), + SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSort -> fieldSort.field("_id").order(SortOrder.Asc))))); + + private static final List DEFAULT_SCROLL_SORT_OPTIONS = List.of( + SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc))))); + private final PluginComponentRefresher clientRefresher; private final SearchContextType searchContextType; @@ -117,10 +125,7 @@ public SearchWithSearchAfterResults searchWithPit(final SearchPointInTimeRequest builder .pit(Pit.of(pitBuilder -> pitBuilder.id(searchPointInTimeRequest.getPitId()).keepAlive(searchPointInTimeRequest.getKeepAlive()))) .size(searchPointInTimeRequest.getPaginationSize()) - .sort(List.of( - SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))), - SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSort -> fieldSort.field("_id").order(SortOrder.Asc))))) - ) + .sort(buildSortOptions(searchPointInTimeRequest.getSortOptions())) .version(true) .query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery)))); @@ -157,7 +162,7 @@ public CreateScrollResponse createScroll(final CreateScrollRequest createScrollR searchResponse = clientRefresher.get() .search(SearchRequest.of(request -> request .scroll(Time.of(time -> time.time(createScrollRequest.getScrollTime()))) - .sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc))))) + .sort(buildSortOptionsForScroll(createScrollRequest.getSortOptions())) .size(createScrollRequest.getSize()) .version(true) .index(createScrollRequest.getIndex())), ObjectNode.class); @@ -226,10 +231,7 @@ public SearchWithSearchAfterResults searchWithoutSearchContext(final NoSearchCon builder .index(noSearchContextSearchRequest.getIndex()) .size(noSearchContextSearchRequest.getPaginationSize()) - .sort(List.of( - SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))), - SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSort -> fieldSort.field("_id").order(SortOrder.Asc))))) - ) + .sort(buildSortOptions(noSearchContextSearchRequest.getSortOptions())) .version(true) .query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery)))); @@ -305,4 +307,22 @@ private List getDocumentsFromResponse(final SearchResponse se .withEventType(EventType.DOCUMENT.toString()).build()) .collect(Collectors.toList()); } + + private List buildSortOptions(final List sortingOptions) { + if (sortingOptions == null || sortingOptions.isEmpty()) { + return DEFAULT_SORT_OPTIONS; + } + return sortingOptions.stream() + .map(opt -> SortOptions.of(b -> b.field( + FieldSort.of(f -> f.field(opt.getFieldName()) + .order("desc".equalsIgnoreCase(opt.getOrder()) ? SortOrder.Desc : SortOrder.Asc))))) + .collect(Collectors.toList()); + } + + private List buildSortOptionsForScroll(final List sortingOptions) { + if (sortingOptions == null || sortingOptions.isEmpty()) { + return DEFAULT_SCROLL_SORT_OPTIONS; + } + return buildSortOptions(sortingOptions); + } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreateScrollRequest.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreateScrollRequest.java index e48c09c1bb..aaf4ee84b2 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreateScrollRequest.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreateScrollRequest.java @@ -4,11 +4,14 @@ */ package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model; +import java.util.List; + public class CreateScrollRequest { private final String index; private final String scrollTime; private final Integer size; + private final List sortingOptions; public String getIndex() { return index; @@ -18,10 +21,13 @@ public String getIndex() { public String getScrollTime() { return scrollTime; } + public List getSortOptions() { return sortingOptions; } + private CreateScrollRequest(final CreateScrollRequest.Builder builder) { this.index = builder.index; this.size = builder.size; this.scrollTime = builder.scrollTime; + this.sortingOptions = builder.sortingOptions; } public static CreateScrollRequest.Builder builder() { @@ -33,6 +39,7 @@ public static class Builder { private String index; private Integer size; private String scrollTime; + private List sortingOptions; public Builder() { @@ -53,6 +60,11 @@ public CreateScrollRequest.Builder withScrollTime(final String scrollTime) { return this; } + public CreateScrollRequest.Builder withSortOptions(final List sortingOptions) { + this.sortingOptions = sortingOptions; + return this; + } + public CreateScrollRequest build() { return new CreateScrollRequest(this); } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/NoSearchContextSearchRequest.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/NoSearchContextSearchRequest.java index 12ee48c330..c34b1dbbf6 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/NoSearchContextSearchRequest.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/NoSearchContextSearchRequest.java @@ -12,11 +12,13 @@ public class NoSearchContextSearchRequest { private final String index; private final List searchAfter; private final Integer paginationSize; + private final List sortingOptions; private NoSearchContextSearchRequest(final NoSearchContextSearchRequest.Builder builder) { this.index = builder.index; this.searchAfter = builder.searchAfter; this.paginationSize = builder.paginationSize; + this.sortingOptions = builder.sortingOptions; } public static NoSearchContextSearchRequest.Builder builder() { @@ -35,11 +37,16 @@ public Integer getPaginationSize() { return paginationSize; } + public List getSortOptions() { + return sortingOptions; + } + public static class Builder { private String index; private List searchAfter; private Integer paginationSize; + private List sortingOptions; public Builder() { @@ -61,6 +68,11 @@ public NoSearchContextSearchRequest.Builder withIndex(final String index) { return this; } + public NoSearchContextSearchRequest.Builder withSortOptions(final List sortingOptions) { + this.sortingOptions = sortingOptions; + return this; + } + public NoSearchContextSearchRequest build() { return new NoSearchContextSearchRequest(this); } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SortingOptions.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SortingOptions.java index d69ee9850f..4214b41a32 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SortingOptions.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SortingOptions.java @@ -6,10 +6,18 @@ package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model; import com.fasterxml.jackson.annotation.JsonProperty; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SortConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; -// TODO: Convert the queryMap sort value from SearchConfiguration to this class to be passed to SearchWithPit or SearchWithScroll public class SortingOptions { + private static final Logger LOG = LoggerFactory.getLogger(SortingOptions.class); + private String fieldName; @JsonProperty("order") @@ -36,4 +44,29 @@ public String getFormat() { public String getMode() { return mode; } -} + + public static List fromSortConfigs(final List sortConfigs) { + if (sortConfigs == null || sortConfigs.isEmpty()) { + return Collections.emptyList(); + } + final List result = sortConfigs.stream() + .map(SortingOptions::fromSortConfig) + .collect(Collectors.toList()); + + final boolean hasIdField = result.stream() + .anyMatch(opt -> "_id".equals(opt.getFieldName())); + if (!hasIdField) { + LOG.warn("Custom sort does not include _id. Pagination results may contain duplicates " + + "or miss documents when sort values are not unique."); + } + + return result; + } + + private static SortingOptions fromSortConfig(final SortConfig sortConfig) { + final SortingOptions sortingOptions = new SortingOptions(); + sortingOptions.fieldName = sortConfig.getName(); + sortingOptions.order = sortConfig.getOrder().getSortOrderValue(); + return sortingOptions; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfigurationTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfigurationTest.java index af61f1cc9f..6412c2bffe 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfigurationTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfigurationTest.java @@ -11,9 +11,11 @@ import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; @@ -28,6 +30,7 @@ void default_search_configuration() { assertThat(searchConfiguration.getBatchSize(), equalTo(1000)); assertThat(searchConfiguration.getSearchContextType(), nullValue()); + assertThat(searchConfiguration.getSort(), nullValue()); } @Test @@ -56,4 +59,37 @@ void search_context_type_invalid() { assertThat(searchConfiguration.isSearchContextTypeValid(), equalTo(false)); assertThat(searchConfiguration.getSearchContextType(), nullValue()); } + + @Test + void search_configuration_with_sort_fields() { + final Map pluginSettings = new HashMap<>(); + pluginSettings.put("sort", List.of( + Map.of("name", "@timestamp", "order", "descending"), + Map.of("name", "_id", "order", "ascending") + )); + + final SearchConfiguration searchConfiguration = objectMapper.convertValue(pluginSettings, SearchConfiguration.class); + + assertThat(searchConfiguration.getSort(), notNullValue()); + assertThat(searchConfiguration.getSort().size(), equalTo(2)); + assertThat(searchConfiguration.getSort().get(0).getName(), equalTo("@timestamp")); + assertThat(searchConfiguration.getSort().get(0).getOrder(), equalTo(SortOrder.DESCENDING)); + assertThat(searchConfiguration.getSort().get(1).getName(), equalTo("_id")); + assertThat(searchConfiguration.getSort().get(1).getOrder(), equalTo(SortOrder.ASCENDING)); + } + + @Test + void search_configuration_with_single_sort_field_defaults_to_ascending() { + final Map pluginSettings = new HashMap<>(); + pluginSettings.put("sort", List.of( + Map.of("name", "created_at") + )); + + final SearchConfiguration searchConfiguration = objectMapper.convertValue(pluginSettings, SearchConfiguration.class); + + assertThat(searchConfiguration.getSort(), notNullValue()); + assertThat(searchConfiguration.getSort().size(), equalTo(1)); + assertThat(searchConfiguration.getSort().get(0).getName(), equalTo("created_at")); + assertThat(searchConfiguration.getSort().get(0).getOrder(), equalTo(SortOrder.ASCENDING)); + } } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SortConfigTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SortConfigTest.java new file mode 100644 index 0000000000..f420916622 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SortConfigTest.java @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.configuration; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +class SortConfigTest { + + private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); + + @Test + void default_order_is_ascending() throws Exception { + final String yaml = "name: \"@timestamp\"\n"; + + final SortConfig sortConfig = objectMapper.readValue(yaml, SortConfig.class); + + assertThat(sortConfig, notNullValue()); + assertThat(sortConfig.getName(), equalTo("@timestamp")); + assertThat(sortConfig.getOrder(), equalTo(SortOrder.ASCENDING)); + } + + @Test + void deserialization_with_descending_order() throws Exception { + final String yaml = "name: \"@timestamp\"\norder: descending\n"; + + final SortConfig sortConfig = objectMapper.readValue(yaml, SortConfig.class); + + assertThat(sortConfig, notNullValue()); + assertThat(sortConfig.getName(), equalTo("@timestamp")); + assertThat(sortConfig.getOrder(), equalTo(SortOrder.DESCENDING)); + } + + @Test + void deserialization_with_ascending_order() throws Exception { + final String yaml = "name: \"_id\"\norder: ascending\n"; + + final SortConfig sortConfig = objectMapper.readValue(yaml, SortConfig.class); + + assertThat(sortConfig, notNullValue()); + assertThat(sortConfig.getName(), equalTo("_id")); + assertThat(sortConfig.getOrder(), equalTo(SortOrder.ASCENDING)); + } + + @Test + void invalid_order_deserializes_to_null() throws Exception { + final String yaml = "name: \"@timestamp\"\norder: invalid\n"; + + final SortConfig sortConfig = objectMapper.readValue(yaml, SortConfig.class); + + assertThat(sortConfig.getOrder(), nullValue()); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessorTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessorTest.java index 405e8d16e0..c50dd258c9 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessorTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessorTest.java @@ -46,6 +46,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SortingOptions; import java.io.IOException; import java.time.Instant; @@ -605,4 +606,82 @@ void search_with_scroll_returns_expected_SearchScrollResponse() throws IOExcepti assertThat(searchScrollResponse.getDocuments().get(1), notNullValue()); assertThat(searchScrollResponse.getDocuments().get(1).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(2L)); } + + @Test + void search_with_pit_with_custom_sort_options_uses_configured_sort() throws IOException { + final String pitId = UUID.randomUUID().toString(); + final Integer paginationSize = new Random().nextInt(); + + final SortingOptions timestampSort = mock(SortingOptions.class); + when(timestampSort.getFieldName()).thenReturn("@timestamp"); + when(timestampSort.getOrder()).thenReturn("desc"); + + final SearchPointInTimeRequest searchPointInTimeRequest = mock(SearchPointInTimeRequest.class); + when(searchPointInTimeRequest.getPitId()).thenReturn(pitId); + when(searchPointInTimeRequest.getPaginationSize()).thenReturn(paginationSize); + when(searchPointInTimeRequest.getSearchAfter()).thenReturn(null); + when(searchPointInTimeRequest.getKeepAlive()).thenReturn("5m"); + when(searchPointInTimeRequest.getSortOptions()).thenReturn(List.of(timestampSort)); + + final SearchResponse searchResponse = mock(SearchResponse.class); + final HitsMetadata hitsMetadata = mock(HitsMetadata.class); + final Hit hit = mock(Hit.class); + when(hit.id()).thenReturn(UUID.randomUUID().toString()); + when(hit.index()).thenReturn(UUID.randomUUID().toString()); + when(hit.source()).thenReturn(mock(ObjectNode.class)); + when(hit.version()).thenReturn(1L); + when(hit.sort()).thenReturn(Collections.singletonList("2024-01-01T00:00:00Z")); + when(hitsMetadata.hits()).thenReturn(List.of(hit)); + when(searchResponse.hits()).thenReturn(hitsMetadata); + + final ArgumentCaptor searchRequestArgumentCaptor = ArgumentCaptor.forClass(SearchRequest.class); + when(elasticSearchClient.search(searchRequestArgumentCaptor.capture(), eq(ObjectNode.class))).thenReturn(searchResponse); + + final SearchWithSearchAfterResults results = createObjectUnderTest().searchWithPit(searchPointInTimeRequest); + + assertThat(results, notNullValue()); + assertThat(results.getDocuments().size(), equalTo(1)); + + final SearchRequest searchRequest = searchRequestArgumentCaptor.getValue(); + assertThat(searchRequest.sort().size(), equalTo(1)); + assertThat(searchRequest.sort().get(0).field().field(), equalTo("@timestamp")); + } + + @Test + void search_without_search_context_with_custom_sort_options_uses_configured_sort() throws IOException { + final Integer paginationSize = new Random().nextInt(); + final String index = UUID.randomUUID().toString(); + + final SortingOptions timestampSort = mock(SortingOptions.class); + when(timestampSort.getFieldName()).thenReturn("@timestamp"); + when(timestampSort.getOrder()).thenReturn("asc"); + + final NoSearchContextSearchRequest noSearchContextSearchRequest = mock(NoSearchContextSearchRequest.class); + when(noSearchContextSearchRequest.getPaginationSize()).thenReturn(paginationSize); + when(noSearchContextSearchRequest.getIndex()).thenReturn(index); + when(noSearchContextSearchRequest.getSearchAfter()).thenReturn(null); + when(noSearchContextSearchRequest.getSortOptions()).thenReturn(List.of(timestampSort)); + + final SearchResponse searchResponse = mock(SearchResponse.class); + final HitsMetadata hitsMetadata = mock(HitsMetadata.class); + final Hit hit = mock(Hit.class); + when(hit.id()).thenReturn(UUID.randomUUID().toString()); + when(hit.index()).thenReturn(UUID.randomUUID().toString()); + when(hit.source()).thenReturn(mock(ObjectNode.class)); + when(hit.version()).thenReturn(1L); + when(hit.sort()).thenReturn(Collections.singletonList("2024-01-01T00:00:00Z")); + when(hitsMetadata.hits()).thenReturn(List.of(hit)); + when(searchResponse.hits()).thenReturn(hitsMetadata); + + final ArgumentCaptor searchRequestArgumentCaptor = ArgumentCaptor.forClass(SearchRequest.class); + when(elasticSearchClient.search(searchRequestArgumentCaptor.capture(), eq(ObjectNode.class))).thenReturn(searchResponse); + + final SearchWithSearchAfterResults results = createObjectUnderTest().searchWithoutSearchContext(noSearchContextSearchRequest); + + assertThat(results, notNullValue()); + + final SearchRequest searchRequest = searchRequestArgumentCaptor.getValue(); + assertThat(searchRequest.sort().size(), equalTo(1)); + assertThat(searchRequest.sort().get(0).field().field(), equalTo("@timestamp")); + } } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java index 663fbaf181..a83735e677 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessorTest.java @@ -46,6 +46,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults; +import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SortingOptions; import java.io.IOException; import java.util.ArrayList; @@ -541,4 +542,81 @@ void search_with_scroll_returns_expected_SearchScrollResponse() throws IOExcepti assertThat(searchScrollResponse.getDocuments().get(1), notNullValue()); assertThat(searchScrollResponse.getDocuments().get(1).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(2L)); } + + @Test + void search_with_pit_with_custom_sort_options_uses_configured_sort() throws IOException { + final String pitId = UUID.randomUUID().toString(); + final Integer paginationSize = new Random().nextInt(); + + final SortingOptions timestampSort = mock(SortingOptions.class); + when(timestampSort.getFieldName()).thenReturn("@timestamp"); + when(timestampSort.getOrder()).thenReturn("desc"); + + final SearchPointInTimeRequest searchPointInTimeRequest = mock(SearchPointInTimeRequest.class); + when(searchPointInTimeRequest.getPitId()).thenReturn(pitId); + when(searchPointInTimeRequest.getPaginationSize()).thenReturn(paginationSize); + when(searchPointInTimeRequest.getSearchAfter()).thenReturn(null); + when(searchPointInTimeRequest.getSortOptions()).thenReturn(List.of(timestampSort)); + + final SearchResponse searchResponse = mock(SearchResponse.class); + final HitsMetadata hitsMetadata = mock(HitsMetadata.class); + final Hit hit = mock(Hit.class); + when(hit.id()).thenReturn(UUID.randomUUID().toString()); + when(hit.index()).thenReturn(UUID.randomUUID().toString()); + when(hit.source()).thenReturn(mock(ObjectNode.class)); + when(hit.version()).thenReturn(1L); + when(hit.sort()).thenReturn(Collections.singletonList("2024-01-01T00:00:00Z")); + when(hitsMetadata.hits()).thenReturn(List.of(hit)); + when(searchResponse.hits()).thenReturn(hitsMetadata); + + final ArgumentCaptor searchRequestArgumentCaptor = ArgumentCaptor.forClass(SearchRequest.class); + when(openSearchClient.search(searchRequestArgumentCaptor.capture(), eq(ObjectNode.class))).thenReturn(searchResponse); + + final SearchWithSearchAfterResults results = createObjectUnderTest().searchWithPit(searchPointInTimeRequest); + + assertThat(results, notNullValue()); + assertThat(results.getDocuments().size(), equalTo(1)); + + final SearchRequest searchRequest = searchRequestArgumentCaptor.getValue(); + assertThat(searchRequest.sort().size(), equalTo(1)); + assertThat(searchRequest.sort().get(0).field().field(), equalTo("@timestamp")); + } + + @Test + void search_without_search_context_with_custom_sort_options_uses_configured_sort() throws IOException { + final Integer paginationSize = new Random().nextInt(); + final String index = UUID.randomUUID().toString(); + + final SortingOptions timestampSort = mock(SortingOptions.class); + when(timestampSort.getFieldName()).thenReturn("@timestamp"); + when(timestampSort.getOrder()).thenReturn("asc"); + + final NoSearchContextSearchRequest noSearchContextSearchRequest = mock(NoSearchContextSearchRequest.class); + when(noSearchContextSearchRequest.getPaginationSize()).thenReturn(paginationSize); + when(noSearchContextSearchRequest.getIndex()).thenReturn(index); + when(noSearchContextSearchRequest.getSearchAfter()).thenReturn(null); + when(noSearchContextSearchRequest.getSortOptions()).thenReturn(List.of(timestampSort)); + + final SearchResponse searchResponse = mock(SearchResponse.class); + final HitsMetadata hitsMetadata = mock(HitsMetadata.class); + final Hit hit = mock(Hit.class); + when(hit.id()).thenReturn(UUID.randomUUID().toString()); + when(hit.index()).thenReturn(UUID.randomUUID().toString()); + when(hit.source()).thenReturn(mock(ObjectNode.class)); + when(hit.version()).thenReturn(1L); + when(hit.sort()).thenReturn(Collections.singletonList("2024-01-01T00:00:00Z")); + when(hitsMetadata.hits()).thenReturn(List.of(hit)); + when(searchResponse.hits()).thenReturn(hitsMetadata); + + final ArgumentCaptor searchRequestArgumentCaptor = ArgumentCaptor.forClass(SearchRequest.class); + when(openSearchClient.search(searchRequestArgumentCaptor.capture(), eq(ObjectNode.class))).thenReturn(searchResponse); + + final SearchWithSearchAfterResults results = createObjectUnderTest().searchWithoutSearchContext(noSearchContextSearchRequest); + + assertThat(results, notNullValue()); + + final SearchRequest searchRequest = searchRequestArgumentCaptor.getValue(); + assertThat(searchRequest.sort().size(), equalTo(1)); + assertThat(searchRequest.sort().get(0).field().field(), equalTo("@timestamp")); + } } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SortingOptionsTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SortingOptionsTest.java new file mode 100644 index 0000000000..6014f4c8f4 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/SortingOptionsTest.java @@ -0,0 +1,101 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SortConfig; +import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SortOrder; + +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class SortingOptionsTest { + + @Test + void fromSortConfigs_with_null_returns_empty_list() { + final List result = SortingOptions.fromSortConfigs(null); + + assertThat(result, notNullValue()); + assertThat(result, empty()); + } + + @Test + void fromSortConfigs_with_empty_list_returns_empty_list() { + final List result = SortingOptions.fromSortConfigs(Collections.emptyList()); + + assertThat(result, notNullValue()); + assertThat(result, empty()); + } + + @Test + void fromSortConfigs_with_ascending_order() { + final SortConfig sortConfig = mock(SortConfig.class); + when(sortConfig.getName()).thenReturn("@timestamp"); + when(sortConfig.getOrder()).thenReturn(SortOrder.ASCENDING); + + final List result = SortingOptions.fromSortConfigs(List.of(sortConfig)); + + assertThat(result.size(), equalTo(1)); + assertThat(result.get(0).getFieldName(), equalTo("@timestamp")); + assertThat(result.get(0).getOrder(), equalTo("asc")); + } + + @Test + void fromSortConfigs_with_descending_order() { + final SortConfig sortConfig = mock(SortConfig.class); + when(sortConfig.getName()).thenReturn("@timestamp"); + when(sortConfig.getOrder()).thenReturn(SortOrder.DESCENDING); + + final List result = SortingOptions.fromSortConfigs(List.of(sortConfig)); + + assertThat(result.size(), equalTo(1)); + assertThat(result.get(0).getFieldName(), equalTo("@timestamp")); + assertThat(result.get(0).getOrder(), equalTo("desc")); + } + + @Test + void fromSortConfigs_with_multiple_configs_including_id() { + final SortConfig timestampConfig = mock(SortConfig.class); + when(timestampConfig.getName()).thenReturn("@timestamp"); + when(timestampConfig.getOrder()).thenReturn(SortOrder.DESCENDING); + + final SortConfig idConfig = mock(SortConfig.class); + when(idConfig.getName()).thenReturn("_id"); + when(idConfig.getOrder()).thenReturn(SortOrder.ASCENDING); + + final List result = SortingOptions.fromSortConfigs(List.of(timestampConfig, idConfig)); + + assertThat(result.size(), equalTo(2)); + assertThat(result.get(0).getFieldName(), equalTo("@timestamp")); + assertThat(result.get(0).getOrder(), equalTo("desc")); + assertThat(result.get(1).getFieldName(), equalTo("_id")); + assertThat(result.get(1).getOrder(), equalTo("asc")); + } + + @Test + void fromSortConfigs_without_id_logs_warning() { + final SortConfig timestampConfig = mock(SortConfig.class); + when(timestampConfig.getName()).thenReturn("@timestamp"); + when(timestampConfig.getOrder()).thenReturn(SortOrder.DESCENDING); + + final List result = SortingOptions.fromSortConfigs(List.of(timestampConfig)); + + assertThat(result.size(), equalTo(1)); + assertThat(result.get(0).getFieldName(), equalTo("@timestamp")); + } +} \ No newline at end of file