Skip to content

Commit c11b644

Browse files
authored
Remove default codec and require codec for sqs sink (#6486)
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent ff70e84 commit c11b644

4 files changed

Lines changed: 6 additions & 46 deletions

File tree

data-prepper-plugins/sqs-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsSink.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.slf4j.LoggerFactory;
3535

3636
import java.time.Duration;
37-
import java.util.Map;
3837
import java.util.Collection;
3938

4039
@Experimental
@@ -60,16 +59,12 @@ public SqsSink(final PluginSetting pluginSetting,
6059
sinkInitialized = false;
6160
final PluginModel codecConfiguration = sqsSinkConfig.getCodec();
6261
final PluginSetting codecPluginSettings;
63-
if (codecConfiguration != null) {
64-
String codecPluginName = codecConfiguration.getPluginName();
65-
if (!codecPluginName.equals("json") && !codecPluginName.equals("ndjson")) {
66-
throw new RuntimeException(String.format("Codec {} not supported.", codecPluginName));
67-
}
68-
codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(),
69-
codecConfiguration.getPluginSettings());
70-
} else {
71-
codecPluginSettings = new PluginSetting("ndjson", Map.of());
62+
String codecPluginName = codecConfiguration.getPluginName();
63+
if (!codecPluginName.equals("json") && !codecPluginName.equals("ndjson")) {
64+
throw new RuntimeException(String.format("Codec {} not supported.", codecPluginName));
7265
}
66+
codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(),
67+
codecConfiguration.getPluginSettings());
7368

7469
AwsConfig awsConfig = sqsSinkConfig.getAwsConfig();
7570
final AwsCredentialsProvider awsCredentialsProvider = (awsConfig != null) ? awsCredentialsSupplier.getProvider(convertToCredentialOptions(awsConfig)) : awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder().build());

data-prepper-plugins/sqs-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsSinkConfig.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public class SqsSinkConfig {
2828
private String queueUrl;
2929

3030
@JsonProperty("codec")
31+
@NotNull
3132
private PluginModel codec;
3233

3334
@JsonProperty("threshold")
@@ -67,12 +68,5 @@ boolean isValidConfig() {
6768
return (deDupId == null);
6869
}
6970
}
70-
71-
@AssertTrue(message = "ndjson codec (default codec) doesn't support max events per message greater than 1")
72-
boolean isValidCodecConfig() {
73-
if ((codec == null || codec.getPluginName().equals("ndjson")) && thresholdConfig.getMaxEventsPerMessage() > 1)
74-
return false;
75-
return true;
76-
}
7771
}
7872

data-prepper-plugins/sqs-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsSinkConfigTest.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -114,21 +114,6 @@ void TestValidFiFoQConfigs() throws Exception {
114114

115115
}
116116

117-
@Test
118-
void TestValidCodecConfig() throws Exception {
119-
reflectivelySetField(sqsSinkConfig, "codec", null);
120-
reflectivelySetField(sqsSinkConfig, "thresholdConfig", sqsThresholdConfig);
121-
when(sqsThresholdConfig.getMaxEventsPerMessage()).thenReturn(2);
122-
assertFalse(sqsSinkConfig.isValidCodecConfig());
123-
when(sqsThresholdConfig.getMaxEventsPerMessage()).thenReturn(1);
124-
assertTrue(sqsSinkConfig.isValidCodecConfig());
125-
PluginModel codec = mock(PluginModel.class);
126-
when(codec.getPluginName()).thenReturn("ndjson");
127-
reflectivelySetField(sqsSinkConfig, "codec", codec);
128-
when(sqsThresholdConfig.getMaxEventsPerMessage()).thenReturn(2);
129-
assertFalse(sqsSinkConfig.isValidCodecConfig());
130-
}
131-
132117
private void reflectivelySetField(final SqsSinkConfig sqsSinkConfig, final String fieldName, final Object value) throws NoSuchFieldException, IllegalAccessException {
133118
final Field field = SqsSinkConfig.class.getDeclaredField(fieldName);
134119
try {

data-prepper-plugins/sqs-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/sqs/SqsSinkTest.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -230,20 +230,6 @@ void TestForDefaultCodec() {
230230
}
231231
}
232232

233-
@Test
234-
void TestForNullCodec() {
235-
when(sqsSinkConfig.getCodec()).thenReturn(null);
236-
try(MockedStatic<SqsClientFactory> mockedStatic = mockStatic(SqsClientFactory.class)) {
237-
mockedStatic.when(() -> SqsClientFactory.createSqsClient(any(Region.class),
238-
any(AwsCredentialsProvider.class)))
239-
.thenReturn(sqsClient);
240-
241-
SqsSink sqsSink = createObjectUnderTest();
242-
sqsSink.doInitialize();
243-
assertTrue(sqsSink.isReady());
244-
}
245-
}
246-
247233
@Test
248234
void TestSinkOutputWithEvents() {
249235
try(MockedStatic<SqsClientFactory> mockedStatic = mockStatic(SqsClientFactory.class)) {

0 commit comments

Comments
 (0)