Skip to content

Commit 6144e46

Browse files
Add configurable sort options to opensearch source search_options (#6761)
Add configurable sort options to opensearch source search_options Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com> Co-authored-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
1 parent 74aa3cf commit 6144e46

16 files changed

Lines changed: 582 additions & 22 deletions

File tree

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfiguration.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77

88
import com.fasterxml.jackson.annotation.JsonIgnore;
99
import com.fasterxml.jackson.annotation.JsonProperty;
10+
import jakarta.validation.Valid;
1011
import jakarta.validation.constraints.AssertTrue;
1112
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType;
1213

14+
import java.util.List;
15+
1316
public class SearchConfiguration {
1417

1518
@JsonProperty("search_context_type")
@@ -18,6 +21,10 @@ public class SearchConfiguration {
1821
@JsonProperty("batch_size")
1922
private Integer batchSize = 1000;
2023

24+
@JsonProperty("sort")
25+
@Valid
26+
private List<SortConfig> sort;
27+
2128
@JsonIgnore
2229
private SearchContextType searchContextTypeValue;
2330

@@ -29,6 +36,10 @@ public Integer getBatchSize() {
2936
return batchSize;
3037
}
3138

39+
public List<SortConfig> getSort() {
40+
return sort;
41+
}
42+
3243
@AssertTrue(message = "search_context_type must be one of [ 'scroll', 'point_in_time', 'none' ]")
3344
boolean isSearchContextTypeValid() {
3445
try {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.opensearch.configuration;
12+
13+
import com.fasterxml.jackson.annotation.JsonProperty;
14+
import jakarta.validation.constraints.NotBlank;
15+
16+
public class SortConfig {
17+
18+
@JsonProperty("name")
19+
@NotBlank(message = "sort field name must not be blank")
20+
private String name;
21+
22+
@JsonProperty("order")
23+
private SortOrder order = SortOrder.ASCENDING;
24+
25+
public String getName() {
26+
return name;
27+
}
28+
29+
public SortOrder getOrder() {
30+
return order;
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.opensearch.configuration;
12+
13+
import com.fasterxml.jackson.annotation.JsonCreator;
14+
import com.fasterxml.jackson.annotation.JsonValue;
15+
16+
import java.util.Arrays;
17+
import java.util.Map;
18+
import java.util.stream.Collectors;
19+
20+
public enum SortOrder {
21+
ASCENDING("ascending", "asc"),
22+
DESCENDING("descending", "desc");
23+
24+
private static final Map<String, SortOrder> NAMES_MAP = Arrays.stream(SortOrder.values())
25+
.collect(Collectors.toMap(
26+
value -> value.optionName,
27+
value -> value
28+
));
29+
30+
private final String optionName;
31+
private final String sortOrderValue;
32+
33+
SortOrder(final String optionName, final String sortOrderValue) {
34+
this.optionName = optionName;
35+
this.sortOrderValue = sortOrderValue;
36+
}
37+
38+
@JsonValue
39+
public String getOptionName() {
40+
return optionName;
41+
}
42+
43+
public String getSortOrderValue() {
44+
return sortOrderValue;
45+
}
46+
47+
@JsonCreator
48+
public static SortOrder fromOptionName(final String optionName) {
49+
return NAMES_MAP.get(optionName);
50+
}
51+
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.IndexNotFoundException;
2525
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.NoSearchContextSearchRequest;
2626
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults;
27+
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SortingOptions;
2728
import org.slf4j.Logger;
2829
import org.slf4j.LoggerFactory;
2930

@@ -147,13 +148,14 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
147148

148149
final SearchConfiguration searchConfiguration = openSearchSourceConfiguration.getSearchConfiguration();
149150
SearchWithSearchAfterResults searchWithSearchAfterResults = null;
151+
final List<SortingOptions> sortingOptions = SortingOptions.fromSortConfigs(searchConfiguration.getSort());
150152

151-
// todo: Pass query and sort options from SearchConfiguration to the search request
152153
do {
153154
searchWithSearchAfterResults = searchAccessor.searchWithoutSearchContext(NoSearchContextSearchRequest.builder()
154155
.withIndex(indexName)
155156
.withPaginationSize(searchConfiguration.getBatchSize())
156157
.withSearchAfter(getSearchAfter(openSearchIndexProgressState, searchWithSearchAfterResults))
158+
.withSortOptions(sortingOptions)
157159
.build());
158160

159161
searchWithSearchAfterResults.getDocuments().stream().map(Record::new).forEach(record -> {

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeletePointInTimeRequest;
2828
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchPointInTimeRequest;
2929
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults;
30+
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SortingOptions;
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
3233

@@ -183,14 +184,15 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
183184

184185
final SearchConfiguration searchConfiguration = openSearchSourceConfiguration.getSearchConfiguration();
185186
SearchWithSearchAfterResults searchWithSearchAfterResults = null;
187+
final List<SortingOptions> sortingOptions = SortingOptions.fromSortConfigs(searchConfiguration.getSort());
186188

187-
// todo: Pass query and sort options from SearchConfiguration to the search request
188189
do {
189190
searchWithSearchAfterResults = searchAccessor.searchWithPit(SearchPointInTimeRequest.builder()
190191
.withPitId(openSearchIndexProgressState.getPitId())
191192
.withKeepAlive(EXTEND_KEEP_ALIVE_TIME)
192193
.withPaginationSize(searchConfiguration.getBatchSize())
193194
.withSearchAfter(getSearchAfter(openSearchIndexProgressState, searchWithSearchAfterResults))
195+
.withSortOptions(sortingOptions)
194196
.build());
195197

196198
searchWithSearchAfterResults.getDocuments().stream().map(Record::new).forEach(record -> {

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.DeleteScrollRequest;
2727
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest;
2828
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse;
29+
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SortingOptions;
2930
import org.slf4j.Logger;
3031
import org.slf4j.LoggerFactory;
3132

@@ -155,11 +156,13 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
155156
LOG.info("Started processing for index: '{}'", indexName);
156157

157158
final Integer batchSize = openSearchSourceConfiguration.getSearchConfiguration().getBatchSize();
159+
final List<SortingOptions> sortingOptions = SortingOptions.fromSortConfigs(openSearchSourceConfiguration.getSearchConfiguration().getSort());
158160

159161
final CreateScrollResponse createScrollResponse = searchAccessor.createScroll(CreateScrollRequest.builder()
160162
.withScrollTime(SCROLL_TIME_PER_BATCH)
161163
.withSize(openSearchSourceConfiguration.getSearchConfiguration().getBatchSize())
162164
.withIndex(indexName)
165+
.withSortOptions(sortingOptions)
163166
.build());
164167

165168
writeDocumentsToBuffer(createScrollResponse.getDocuments(), acknowledgementSet);

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest;
4444
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse;
4545
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults;
46+
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SortingOptions;
4647
import org.slf4j.Logger;
4748
import org.slf4j.LoggerFactory;
4849

@@ -65,6 +66,13 @@ public class ElasticsearchAccessor implements SearchAccessor, ClusterClientFacto
6566
static final String PIT_RESOURCE_LIMIT_ERROR_TYPE = "rejected_execution_exception";
6667
static final String INDEX_NOT_FOUND_EXCEPTION = "index_not_found_exception";
6768

69+
private static final List<SortOptions> DEFAULT_SORT_OPTIONS = List.of(
70+
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
71+
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSort -> fieldSort.field("_id").order(SortOrder.Asc)))));
72+
73+
private static final List<SortOptions> DEFAULT_SCROLL_SORT_OPTIONS = List.of(
74+
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))));
75+
6876
private final PluginComponentRefresher<ElasticsearchClient, OpenSearchSourceConfiguration>
6977
elasticsearchClientRefresher;
7078
private final SearchContextType searchContextType;
@@ -119,10 +127,7 @@ public SearchWithSearchAfterResults searchWithPit(final SearchPointInTimeRequest
119127
.id(searchPointInTimeRequest.getPitId())
120128
.keepAlive(Time.of(time -> time.time(searchPointInTimeRequest.getKeepAlive())))))
121129
.size(searchPointInTimeRequest.getPaginationSize())
122-
.sort(List.of(
123-
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
124-
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSortBuilder -> fieldSortBuilder.field("_id").order(SortOrder.Asc)))))
125-
)
130+
.sort(buildSortOptions(searchPointInTimeRequest.getSortOptions()))
126131
.version(true)
127132
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));
128133

@@ -161,7 +166,7 @@ public CreateScrollResponse createScroll(final CreateScrollRequest createScrollR
161166
searchResponse = elasticsearchClientRefresher.get()
162167
.search(SearchRequest.of(request -> request
163168
.scroll(Time.of(time -> time.time(createScrollRequest.getScrollTime())))
164-
.sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))))
169+
.sort(buildSortOptionsForScroll(createScrollRequest.getSortOptions()))
165170
.size(createScrollRequest.getSize())
166171
.version(true)
167172
.index(createScrollRequest.getIndex())), ObjectNode.class);
@@ -232,10 +237,7 @@ public SearchWithSearchAfterResults searchWithoutSearchContext(final NoSearchCon
232237
builder
233238
.index(noSearchContextSearchRequest.getIndex())
234239
.size(noSearchContextSearchRequest.getPaginationSize())
235-
.sort(List.of(
236-
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
237-
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSortBuilder -> fieldSortBuilder.field("_id").order(SortOrder.Asc)))))
238-
)
240+
.sort(buildSortOptions(noSearchContextSearchRequest.getSortOptions()))
239241
.version(true)
240242
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));
241243

@@ -306,4 +308,22 @@ private List<Event> getDocumentsFromResponse(final SearchResponse<ObjectNode> se
306308
.withEventType(EventType.DOCUMENT.toString()).build())
307309
.collect(Collectors.toList());
308310
}
311+
312+
private List<SortOptions> buildSortOptions(final List<SortingOptions> sortingOptions) {
313+
if (sortingOptions == null || sortingOptions.isEmpty()) {
314+
return DEFAULT_SORT_OPTIONS;
315+
}
316+
return sortingOptions.stream()
317+
.map(opt -> SortOptions.of(b -> b.field(
318+
FieldSort.of(f -> f.field(opt.getFieldName())
319+
.order("desc".equalsIgnoreCase(opt.getOrder()) ? SortOrder.Desc : SortOrder.Asc)))))
320+
.collect(Collectors.toList());
321+
}
322+
323+
private List<SortOptions> buildSortOptionsForScroll(final List<SortingOptions> sortingOptions) {
324+
if (sortingOptions == null || sortingOptions.isEmpty()) {
325+
return DEFAULT_SCROLL_SORT_OPTIONS;
326+
}
327+
return buildSortOptions(sortingOptions);
328+
}
309329
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollRequest;
4444
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchScrollResponse;
4545
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchWithSearchAfterResults;
46+
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SortingOptions;
4647
import org.slf4j.Logger;
4748
import org.slf4j.LoggerFactory;
4849

@@ -66,6 +67,13 @@ public class OpenSearchAccessor implements SearchAccessor, ClusterClientFactory<
6667
static final String INDEX_NOT_FOUND_EXCEPTION = "index_not_found_exception";
6768
static final String SCROLL_RESOURCE_LIMIT_EXCEPTION_MESSAGE = "Trying to create too many scroll contexts";
6869

70+
private static final List<SortOptions> DEFAULT_SORT_OPTIONS = List.of(
71+
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
72+
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSort -> fieldSort.field("_id").order(SortOrder.Asc)))));
73+
74+
private static final List<SortOptions> DEFAULT_SCROLL_SORT_OPTIONS = List.of(
75+
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))));
76+
6977
private final PluginComponentRefresher<OpenSearchClient, OpenSearchSourceConfiguration> clientRefresher;
7078
private final SearchContextType searchContextType;
7179

@@ -117,10 +125,7 @@ public SearchWithSearchAfterResults searchWithPit(final SearchPointInTimeRequest
117125
builder
118126
.pit(Pit.of(pitBuilder -> pitBuilder.id(searchPointInTimeRequest.getPitId()).keepAlive(searchPointInTimeRequest.getKeepAlive())))
119127
.size(searchPointInTimeRequest.getPaginationSize())
120-
.sort(List.of(
121-
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
122-
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSort -> fieldSort.field("_id").order(SortOrder.Asc)))))
123-
)
128+
.sort(buildSortOptions(searchPointInTimeRequest.getSortOptions()))
124129
.version(true)
125130
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));
126131

@@ -157,7 +162,7 @@ public CreateScrollResponse createScroll(final CreateScrollRequest createScrollR
157162
searchResponse = clientRefresher.get()
158163
.search(SearchRequest.of(request -> request
159164
.scroll(Time.of(time -> time.time(createScrollRequest.getScrollTime())))
160-
.sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))))
165+
.sort(buildSortOptionsForScroll(createScrollRequest.getSortOptions()))
161166
.size(createScrollRequest.getSize())
162167
.version(true)
163168
.index(createScrollRequest.getIndex())), ObjectNode.class);
@@ -226,10 +231,7 @@ public SearchWithSearchAfterResults searchWithoutSearchContext(final NoSearchCon
226231
builder
227232
.index(noSearchContextSearchRequest.getIndex())
228233
.size(noSearchContextSearchRequest.getPaginationSize())
229-
.sort(List.of(
230-
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
231-
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSort -> fieldSort.field("_id").order(SortOrder.Asc)))))
232-
)
234+
.sort(buildSortOptions(noSearchContextSearchRequest.getSortOptions()))
233235
.version(true)
234236
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));
235237

@@ -305,4 +307,22 @@ private List<Event> getDocumentsFromResponse(final SearchResponse<ObjectNode> se
305307
.withEventType(EventType.DOCUMENT.toString()).build())
306308
.collect(Collectors.toList());
307309
}
310+
311+
private List<SortOptions> buildSortOptions(final List<SortingOptions> sortingOptions) {
312+
if (sortingOptions == null || sortingOptions.isEmpty()) {
313+
return DEFAULT_SORT_OPTIONS;
314+
}
315+
return sortingOptions.stream()
316+
.map(opt -> SortOptions.of(b -> b.field(
317+
FieldSort.of(f -> f.field(opt.getFieldName())
318+
.order("desc".equalsIgnoreCase(opt.getOrder()) ? SortOrder.Desc : SortOrder.Asc)))))
319+
.collect(Collectors.toList());
320+
}
321+
322+
private List<SortOptions> buildSortOptionsForScroll(final List<SortingOptions> sortingOptions) {
323+
if (sortingOptions == null || sortingOptions.isEmpty()) {
324+
return DEFAULT_SCROLL_SORT_OPTIONS;
325+
}
326+
return buildSortOptions(sortingOptions);
327+
}
308328
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/CreateScrollRequest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@
44
*/
55
package org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model;
66

7+
import java.util.List;
8+
79
public class CreateScrollRequest {
810

911
private final String index;
1012
private final String scrollTime;
1113
private final Integer size;
14+
private final List<SortingOptions> sortingOptions;
1215

1316
public String getIndex() {
1417
return index;
@@ -18,10 +21,13 @@ public String getIndex() {
1821

1922
public String getScrollTime() { return scrollTime; }
2023

24+
public List<SortingOptions> getSortOptions() { return sortingOptions; }
25+
2126
private CreateScrollRequest(final CreateScrollRequest.Builder builder) {
2227
this.index = builder.index;
2328
this.size = builder.size;
2429
this.scrollTime = builder.scrollTime;
30+
this.sortingOptions = builder.sortingOptions;
2531
}
2632

2733
public static CreateScrollRequest.Builder builder() {
@@ -33,6 +39,7 @@ public static class Builder {
3339
private String index;
3440
private Integer size;
3541
private String scrollTime;
42+
private List<SortingOptions> sortingOptions;
3643

3744
public Builder() {
3845

@@ -53,6 +60,11 @@ public CreateScrollRequest.Builder withScrollTime(final String scrollTime) {
5360
return this;
5461
}
5562

63+
public CreateScrollRequest.Builder withSortOptions(final List<SortingOptions> sortingOptions) {
64+
this.sortingOptions = sortingOptions;
65+
return this;
66+
}
67+
5668
public CreateScrollRequest build() {
5769
return new CreateScrollRequest(this);
5870
}

0 commit comments

Comments
 (0)