Skip to content

Commit d260e0e

Browse files
authored
Extract validation logic from FlintIndexMetadataServiceImpl (#2944)
Signed-off-by: Tomoyuki Morita <moritato@amazon.com>
1 parent e1ee3b1 commit d260e0e

3 files changed

Lines changed: 179 additions & 68 deletions

File tree

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.spark.flint;
7+
8+
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.AUTO_REFRESH;
9+
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.CHECKPOINT_LOCATION;
10+
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.INCREMENTAL_REFRESH;
11+
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.WATERMARK_DELAY;
12+
13+
import java.util.ArrayList;
14+
import java.util.Arrays;
15+
import java.util.HashMap;
16+
import java.util.LinkedHashSet;
17+
import java.util.List;
18+
import java.util.Map;
19+
import java.util.Set;
20+
import org.apache.commons.lang3.StringUtils;
21+
import org.apache.logging.log4j.LogManager;
22+
import org.apache.logging.log4j.Logger;
23+
24+
public class FlintIndexMetadataValidator {
25+
private static final Logger LOGGER = LogManager.getLogger(FlintIndexMetadataValidator.class);
26+
27+
public static final Set<String> ALTER_TO_FULL_REFRESH_ALLOWED_OPTIONS =
28+
new LinkedHashSet<>(Arrays.asList(AUTO_REFRESH, INCREMENTAL_REFRESH));
29+
public static final Set<String> ALTER_TO_INCREMENTAL_REFRESH_ALLOWED_OPTIONS =
30+
new LinkedHashSet<>(
31+
Arrays.asList(AUTO_REFRESH, INCREMENTAL_REFRESH, WATERMARK_DELAY, CHECKPOINT_LOCATION));
32+
33+
/**
34+
* Validate if the flint index options contain valid key/value pairs. Throws
35+
* IllegalArgumentException with description about invalid options.
36+
*/
37+
public static void validateFlintIndexOptions(
38+
String kind, Map<String, Object> existingOptions, Map<String, String> newOptions) {
39+
if ((newOptions.containsKey(INCREMENTAL_REFRESH)
40+
&& Boolean.parseBoolean(newOptions.get(INCREMENTAL_REFRESH)))
41+
|| ((!newOptions.containsKey(INCREMENTAL_REFRESH)
42+
&& Boolean.parseBoolean((String) existingOptions.get(INCREMENTAL_REFRESH))))) {
43+
validateConversionToIncrementalRefresh(kind, existingOptions, newOptions);
44+
} else {
45+
validateConversionToFullRefresh(newOptions);
46+
}
47+
}
48+
49+
private static void validateConversionToFullRefresh(Map<String, String> newOptions) {
50+
if (!ALTER_TO_FULL_REFRESH_ALLOWED_OPTIONS.containsAll(newOptions.keySet())) {
51+
throw new IllegalArgumentException(
52+
String.format(
53+
"Altering to full refresh only allows: %s options",
54+
ALTER_TO_FULL_REFRESH_ALLOWED_OPTIONS));
55+
}
56+
}
57+
58+
private static void validateConversionToIncrementalRefresh(
59+
String kind, Map<String, Object> existingOptions, Map<String, String> newOptions) {
60+
if (!ALTER_TO_INCREMENTAL_REFRESH_ALLOWED_OPTIONS.containsAll(newOptions.keySet())) {
61+
throw new IllegalArgumentException(
62+
String.format(
63+
"Altering to incremental refresh only allows: %s options",
64+
ALTER_TO_INCREMENTAL_REFRESH_ALLOWED_OPTIONS));
65+
}
66+
HashMap<String, Object> mergedOptions = new HashMap<>();
67+
mergedOptions.putAll(existingOptions);
68+
mergedOptions.putAll(newOptions);
69+
List<String> missingAttributes = new ArrayList<>();
70+
if (!mergedOptions.containsKey(CHECKPOINT_LOCATION)
71+
|| StringUtils.isEmpty((String) mergedOptions.get(CHECKPOINT_LOCATION))) {
72+
missingAttributes.add(CHECKPOINT_LOCATION);
73+
}
74+
if (kind.equals("mv")
75+
&& (!mergedOptions.containsKey(WATERMARK_DELAY)
76+
|| StringUtils.isEmpty((String) mergedOptions.get(WATERMARK_DELAY)))) {
77+
missingAttributes.add(WATERMARK_DELAY);
78+
}
79+
if (missingAttributes.size() > 0) {
80+
String errorMessage =
81+
"Conversion to incremental refresh index cannot proceed due to missing attributes: "
82+
+ String.join(", ", missingAttributes)
83+
+ ".";
84+
LOGGER.error(errorMessage);
85+
throw new IllegalArgumentException(errorMessage);
86+
}
87+
}
88+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.spark.flint;
7+
8+
import static org.junit.jupiter.api.Assertions.assertThrows;
9+
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.AUTO_REFRESH;
10+
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.CHECKPOINT_LOCATION;
11+
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.INCREMENTAL_REFRESH;
12+
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.WATERMARK_DELAY;
13+
14+
import com.google.common.collect.ImmutableMap;
15+
import java.util.Map;
16+
import org.junit.jupiter.api.Test;
17+
18+
class FlintIndexMetadataValidatorTest {
19+
@Test
20+
public void conversionToIncrementalRefreshWithValidOption() {
21+
Map<String, Object> existingOptions =
22+
ImmutableMap.<String, Object>builder().put(INCREMENTAL_REFRESH, "false").build();
23+
Map<String, String> newOptions =
24+
ImmutableMap.<String, String>builder()
25+
.put(INCREMENTAL_REFRESH, "true")
26+
.put(CHECKPOINT_LOCATION, "checkpoint_location")
27+
.put(WATERMARK_DELAY, "1")
28+
.build();
29+
30+
FlintIndexMetadataValidator.validateFlintIndexOptions("mv", existingOptions, newOptions);
31+
}
32+
33+
@Test
34+
public void conversionToIncrementalRefreshWithMissingOptions() {
35+
Map<String, Object> existingOptions =
36+
ImmutableMap.<String, Object>builder().put(AUTO_REFRESH, "true").build();
37+
Map<String, String> newOptions =
38+
ImmutableMap.<String, String>builder().put(INCREMENTAL_REFRESH, "true").build();
39+
40+
assertThrows(
41+
IllegalArgumentException.class,
42+
() ->
43+
FlintIndexMetadataValidator.validateFlintIndexOptions(
44+
"mv", existingOptions, newOptions));
45+
}
46+
47+
@Test
48+
public void conversionToIncrementalRefreshWithInvalidOption() {
49+
Map<String, Object> existingOptions =
50+
ImmutableMap.<String, Object>builder().put(INCREMENTAL_REFRESH, "false").build();
51+
Map<String, String> newOptions =
52+
ImmutableMap.<String, String>builder()
53+
.put(INCREMENTAL_REFRESH, "true")
54+
.put("INVALID_OPTION", "1")
55+
.build();
56+
57+
assertThrows(
58+
IllegalArgumentException.class,
59+
() ->
60+
FlintIndexMetadataValidator.validateFlintIndexOptions(
61+
"mv", existingOptions, newOptions));
62+
}
63+
64+
@Test
65+
public void conversionToFullRefreshWithValidOption() {
66+
Map<String, Object> existingOptions =
67+
ImmutableMap.<String, Object>builder().put(AUTO_REFRESH, "false").build();
68+
Map<String, String> newOptions =
69+
ImmutableMap.<String, String>builder().put(AUTO_REFRESH, "true").build();
70+
71+
FlintIndexMetadataValidator.validateFlintIndexOptions("mv", existingOptions, newOptions);
72+
}
73+
74+
@Test
75+
public void conversionToFullRefreshWithInvalidOption() {
76+
Map<String, Object> existingOptions =
77+
ImmutableMap.<String, Object>builder().put(AUTO_REFRESH, "false").build();
78+
Map<String, String> newOptions =
79+
ImmutableMap.<String, String>builder()
80+
.put(AUTO_REFRESH, "true")
81+
.put(WATERMARK_DELAY, "1")
82+
.build();
83+
84+
assertThrows(
85+
IllegalArgumentException.class,
86+
() ->
87+
FlintIndexMetadataValidator.validateFlintIndexOptions(
88+
"mv", existingOptions, newOptions));
89+
}
90+
}

async-query/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataServiceImpl.java

Lines changed: 1 addition & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,6 @@
55

66
package org.opensearch.sql.spark.flint;
77

8-
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.AUTO_REFRESH;
9-
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.CHECKPOINT_LOCATION;
10-
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.INCREMENTAL_REFRESH;
11-
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.WATERMARK_DELAY;
128
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.APP_ID;
139
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.ENV_KEY;
1410
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.KIND_KEY;
@@ -20,15 +16,9 @@
2016
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.SERVERLESS_EMR_JOB_ID;
2117
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.SOURCE_KEY;
2218

23-
import java.util.ArrayList;
24-
import java.util.Arrays;
2519
import java.util.HashMap;
26-
import java.util.LinkedHashSet;
27-
import java.util.List;
2820
import java.util.Map;
29-
import java.util.Set;
3021
import lombok.AllArgsConstructor;
31-
import org.apache.commons.lang3.StringUtils;
3222
import org.apache.logging.log4j.LogManager;
3323
import org.apache.logging.log4j.Logger;
3424
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse;
@@ -43,11 +33,6 @@ public class FlintIndexMetadataServiceImpl implements FlintIndexMetadataService
4333
private static final Logger LOGGER = LogManager.getLogger(FlintIndexMetadataServiceImpl.class);
4434

4535
private final Client client;
46-
public static final Set<String> ALTER_TO_FULL_REFRESH_ALLOWED_OPTIONS =
47-
new LinkedHashSet<>(Arrays.asList(AUTO_REFRESH, INCREMENTAL_REFRESH));
48-
public static final Set<String> ALTER_TO_INCREMENTAL_REFRESH_ALLOWED_OPTIONS =
49-
new LinkedHashSet<>(
50-
Arrays.asList(AUTO_REFRESH, INCREMENTAL_REFRESH, WATERMARK_DELAY, CHECKPOINT_LOCATION));
5136

5237
@Override
5338
public Map<String, FlintIndexMetadata> getFlintIndexMetadata(
@@ -87,63 +72,11 @@ public void updateIndexToManualRefresh(
8772
String kind = (String) meta.get("kind");
8873
Map<String, Object> options = (Map<String, Object>) meta.get("options");
8974
Map<String, String> newOptions = flintIndexOptions.getProvidedOptions();
90-
validateFlintIndexOptions(kind, options, newOptions);
75+
FlintIndexMetadataValidator.validateFlintIndexOptions(kind, options, newOptions);
9176
options.putAll(newOptions);
9277
client.admin().indices().preparePutMapping(indexName).setSource(flintMetadataMap).get();
9378
}
9479

95-
private void validateFlintIndexOptions(
96-
String kind, Map<String, Object> existingOptions, Map<String, String> newOptions) {
97-
if ((newOptions.containsKey(INCREMENTAL_REFRESH)
98-
&& Boolean.parseBoolean(newOptions.get(INCREMENTAL_REFRESH)))
99-
|| ((!newOptions.containsKey(INCREMENTAL_REFRESH)
100-
&& Boolean.parseBoolean((String) existingOptions.get(INCREMENTAL_REFRESH))))) {
101-
validateConversionToIncrementalRefresh(kind, existingOptions, newOptions);
102-
} else {
103-
validateConversionToFullRefresh(newOptions);
104-
}
105-
}
106-
107-
private void validateConversionToFullRefresh(Map<String, String> newOptions) {
108-
if (!ALTER_TO_FULL_REFRESH_ALLOWED_OPTIONS.containsAll(newOptions.keySet())) {
109-
throw new IllegalArgumentException(
110-
String.format(
111-
"Altering to full refresh only allows: %s options",
112-
ALTER_TO_FULL_REFRESH_ALLOWED_OPTIONS));
113-
}
114-
}
115-
116-
private void validateConversionToIncrementalRefresh(
117-
String kind, Map<String, Object> existingOptions, Map<String, String> newOptions) {
118-
if (!ALTER_TO_INCREMENTAL_REFRESH_ALLOWED_OPTIONS.containsAll(newOptions.keySet())) {
119-
throw new IllegalArgumentException(
120-
String.format(
121-
"Altering to incremental refresh only allows: %s options",
122-
ALTER_TO_INCREMENTAL_REFRESH_ALLOWED_OPTIONS));
123-
}
124-
HashMap<String, Object> mergedOptions = new HashMap<>();
125-
mergedOptions.putAll(existingOptions);
126-
mergedOptions.putAll(newOptions);
127-
List<String> missingAttributes = new ArrayList<>();
128-
if (!mergedOptions.containsKey(CHECKPOINT_LOCATION)
129-
|| StringUtils.isEmpty((String) mergedOptions.get(CHECKPOINT_LOCATION))) {
130-
missingAttributes.add(CHECKPOINT_LOCATION);
131-
}
132-
if (kind.equals("mv")
133-
&& (!mergedOptions.containsKey(WATERMARK_DELAY)
134-
|| StringUtils.isEmpty((String) mergedOptions.get(WATERMARK_DELAY)))) {
135-
missingAttributes.add(WATERMARK_DELAY);
136-
}
137-
if (missingAttributes.size() > 0) {
138-
String errorMessage =
139-
"Conversion to incremental refresh index cannot proceed due to missing attributes: "
140-
+ String.join(", ", missingAttributes)
141-
+ ".";
142-
LOGGER.error(errorMessage);
143-
throw new IllegalArgumentException(errorMessage);
144-
}
145-
}
146-
14780
private FlintIndexMetadata fromMetadata(String indexName, Map<String, Object> metaMap) {
14881
FlintIndexMetadata.FlintIndexMetadataBuilder flintIndexMetadataBuilder =
14982
FlintIndexMetadata.builder();

0 commit comments

Comments
 (0)