1414import org .apache .kafka .clients .consumer .ConsumerRecord ;
1515import org .apache .kafka .clients .consumer .ConsumerRecords ;
1616import org .apache .kafka .clients .consumer .KafkaConsumer ;
17- import org .apache .kafka .common .config .SaslConfigs ;
18- import org .apache .kafka .common .security .auth .SecurityProtocol ;
1917import org .apache .kafka .common .serialization .StringDeserializer ;
2018import org .apache .kafka .connect .json .JsonDeserializer ;
21- import org .hamcrest . CoreMatchers ;
19+ import org .junit . jupiter . api . AfterEach ;
2220import org .junit .jupiter .api .BeforeEach ;
2321import org .junit .jupiter .api .Test ;
24- import org .mockito .Mock ;
2522import org .opensearch .dataprepper .aws .api .AwsCredentialsSupplier ;
2623import org .opensearch .dataprepper .expression .ExpressionEvaluator ;
2724import org .opensearch .dataprepper .metrics .PluginMetrics ;
3431import org .opensearch .dataprepper .model .sink .SinkContext ;
3532import org .opensearch .dataprepper .plugins .dlq .DlqProvider ;
3633import org .opensearch .dataprepper .plugins .dlq .DlqWriter ;
37- import org .opensearch .dataprepper .plugins .kafka .configuration .AuthConfig ;
38- import org .opensearch .dataprepper .plugins .kafka .configuration .PlainTextAuthConfig ;
34+ import org .opensearch .dataprepper .plugins .kafka .configuration .EncryptionConfig ;
35+ import org .opensearch .dataprepper .plugins .kafka .configuration .EncryptionType ;
3936import org .opensearch .dataprepper .plugins .kafka .configuration .TopicProducerConfig ;
4037import org .opensearch .dataprepper .plugins .kafka .util .MessageFormat ;
4138
39+ import java .time .Duration ;
4240import java .util .ArrayList ;
43- import java .util .Arrays ;
4441import java .util .Collections ;
4542import java .util .HashMap ;
4643import java .util .List ;
5047import java .util .concurrent .TimeUnit ;
5148import java .util .concurrent .atomic .AtomicBoolean ;
5249
50+ import static org .awaitility .Awaitility .await ;
51+ import static org .hamcrest .CoreMatchers .equalTo ;
5352import static org .hamcrest .MatcherAssert .assertThat ;
5453import static org .mockito .ArgumentMatchers .any ;
5554import static org .mockito .ArgumentMatchers .anyString ;
5857
5958public class KafkaSinkJsonTypeIT {
6059 private static final int TEST_ID = 123456 ;
61- @ Mock
62- private KafkaSinkConfig kafkaSinkConfig ;
6360
64- @ Mock
61+ private KafkaSinkConfig kafkaSinkConfig ;
6562 private TopicProducerConfig topicConfig ;
66-
6763 private KafkaSink kafkaSink ;
68-
6964 private String bootstrapServers ;
7065 private String testTopic ;
71-
7266 private PluginSetting pluginSetting ;
73-
74- @ Mock
7567 private PluginFactory pluginFactory ;
76-
77- @ Mock
7868 private PluginMetrics pluginMetrics ;
79-
8069 private SinkContext sinkContext ;
81-
82- @ Mock
8370 private DlqProvider dlqProvider ;
84-
85- @ Mock
8671 private DlqWriter dlqWriter ;
87-
88- @ Mock
8972 private ExpressionEvaluator evaluator ;
90-
91- @ Mock
9273 private AwsCredentialsSupplier awsCredentialsSupplier ;
93-
94- private PlainTextAuthConfig plainTextAuthConfig ;
95- private AuthConfig .SaslAuthConfig saslAuthConfig ;
96- private AuthConfig authConfig ;
97-
98- private static final Properties props = new Properties ();
99-
74+ private EncryptionConfig encryptionConfig ;
75+ private Properties props ;
10076
10177 public KafkaSink createObjectUnderTest () {
10278 return new KafkaSink (pluginSetting , kafkaSinkConfig , pluginFactory , pluginMetrics , evaluator , sinkContext , awsCredentialsSupplier );
10379 }
10480
10581 @ BeforeEach
10682 public void setup () {
107- plainTextAuthConfig = mock ( PlainTextAuthConfig . class );
108- saslAuthConfig = mock (AuthConfig . SaslAuthConfig .class );
109- authConfig = mock ( AuthConfig . class );
83+ props = new Properties ( );
84+ encryptionConfig = mock (EncryptionConfig .class );
85+ when ( encryptionConfig . getType ()). thenReturn ( EncryptionType . NONE );
11086
11187 evaluator = mock (ExpressionEvaluator .class );
11288 dlqWriter = mock (DlqWriter .class );
11389 dlqProvider = mock (DlqProvider .class );
11490 sinkContext = mock (SinkContext .class );
11591 pluginFactory = mock (PluginFactory .class );
11692 pluginSetting = mock (PluginSetting .class );
93+ pluginMetrics = mock (PluginMetrics .class );
94+ awsCredentialsSupplier = mock (AwsCredentialsSupplier .class );
11795 when (pluginSetting .getName ()).thenReturn ("name" );
11896 when (pluginSetting .getPipelineName ()).thenReturn ("pipelinename" );
11997
@@ -124,34 +102,39 @@ public void setup() {
124102 when (kafkaSinkConfig .getSchemaConfig ()).thenReturn (null );
125103 when (kafkaSinkConfig .getSerdeFormat ()).thenReturn (MessageFormat .JSON .toString ());
126104 when (kafkaSinkConfig .getPartitionKey ()).thenReturn ("test-${name}" );
105+ when (kafkaSinkConfig .getEncryptionConfig ()).thenReturn (encryptionConfig );
106+ when (kafkaSinkConfig .getAuthConfig ()).thenReturn (null );
127107
128108 testTopic = "TestTopic_" + RandomStringUtils .randomAlphabetic (5 );
129109
130110 topicConfig = mock (TopicProducerConfig .class );
131111 when (topicConfig .getName ()).thenReturn (testTopic );
112+ when (topicConfig .getSerdeFormat ()).thenReturn (MessageFormat .JSON );
113+ when (topicConfig .isCreateTopic ()).thenReturn (false );
114+ when (kafkaSinkConfig .getTopic ()).thenReturn (topicConfig );
115+
132116 bootstrapServers = System .getProperty ("tests.kafka.bootstrap_servers" );
133117 when (kafkaSinkConfig .getBootstrapServers ()).thenReturn (Collections .singletonList (bootstrapServers ));
134118 props .put (AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers );
135119 }
136120
137- @ Test
138- public void TestPollRecordsJsonSASLPlainText () throws Exception {
139- configureJasConfForSASLPlainText ();
121+ @ AfterEach
122+ public void tearDown () {
123+ try (AdminClient adminClient = AdminClient .create (props )) {
124+ adminClient .deleteTopics (Collections .singleton (testTopic ))
125+ .all ().whenComplete ((v , throwable ) -> {});
126+ }
127+ }
140128
129+ @ Test
130+ public void TestPollRecordsJson () throws Exception {
141131 final int numRecords = 1 ;
142- when (topicConfig .isCreateTopic ()).thenReturn (false );
143- when (kafkaSinkConfig .getTopic ()).thenReturn (topicConfig );
144- when (kafkaSinkConfig .getAuthConfig ()).thenReturn (authConfig );
145132 kafkaSink = createObjectUnderTest ();
146133
147-
148134 AtomicBoolean created = new AtomicBoolean (false );
149- final String topicName = topicConfig .getName ();
150-
151- createTopic (created , topicName );
135+ createTopic (created , testTopic );
152136
153137 final List <Record <Event >> records = new ArrayList <>();
154-
155138 for (int i = 0 ; i < numRecords ; i ++) {
156139 final Map <String , String > eventData = new HashMap <>();
157140 eventData .put ("name" , "testName" );
@@ -163,11 +146,7 @@ public void TestPollRecordsJsonSASLPlainText() throws Exception {
163146 kafkaSink .doInitialize ();
164147 kafkaSink .doOutput (records );
165148
166- Thread .sleep (4000 );
167-
168- consumeTestMessages (records );
169-
170- deleteTopic (created , topicName );
149+ consumeAndVerifyMessages (records );
171150 }
172151
173152 private void createTopic (AtomicBoolean created , String topicName ) throws InterruptedException {
@@ -181,76 +160,34 @@ private void createTopic(AtomicBoolean created, String topicName) throws Interru
181160 }
182161 created .set (true );
183162 }
184- while (created .get () != true ) {
185- Thread .sleep (1000 );
186- }
187- }
188-
189- private void deleteTopic (AtomicBoolean created , String topicName ) throws InterruptedException {
190- try (AdminClient adminClient = AdminClient .create (props )) {
191- try {
192- adminClient .deleteTopics (Collections .singleton (topicName ))
193- .all ().get (30 , TimeUnit .SECONDS );
194- } catch (Exception e ) {
195- throw new RuntimeException (e );
196- }
197- created .set (false );
198- }
199- while (created .get () != false ) {
200- Thread .sleep (1000 );
201- }
202- }
203-
204- private void configureJasConfForSASLPlainText () {
205- String username = System .getProperty ("tests.kafka.authconfig.username" );
206- String password = System .getProperty ("tests.kafka.authconfig.password" );
207- when (plainTextAuthConfig .getUsername ()).thenReturn (username );
208- when (plainTextAuthConfig .getPassword ()).thenReturn (password );
209- when (saslAuthConfig .getPlainTextAuthConfig ()).thenReturn (plainTextAuthConfig );
210- when (authConfig .getSaslAuthConfig ()).thenReturn (saslAuthConfig );
211-
212- String jasConf = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\" " + username + "\" password=\" " + password + "\" ;" ;
213- props .put (SaslConfigs .SASL_JAAS_CONFIG , jasConf );
214- props .put (SaslConfigs .SASL_MECHANISM , "PLAIN" );
215- props .put ("security.protocol" , SecurityProtocol .SASL_PLAINTEXT .toString ());
163+ await ().atMost (Duration .ofSeconds (30 )).until (created ::get );
216164 }
217165
218- private void consumeTestMessages (List <Record <Event >> recList ) {
166+ private void consumeAndVerifyMessages (List <Record <Event >> expectedRecords ) {
219167 final String testGroup = "TestGroup_" + RandomStringUtils .randomAlphabetic (5 );
220168 props .put (ConsumerConfig .GROUP_ID_CONFIG , testGroup );
221- props .put (ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG ,
222- StringDeserializer .class );
223- props .put (ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG ,
224- JsonDeserializer .class );
225-
226- KafkaConsumer <String , JsonNode > kafkaConsumer = new KafkaConsumer <>(props );
227-
228- kafkaConsumer .subscribe (Arrays .asList (topicConfig .getName ()));
169+ props .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , "earliest" );
170+ props .put (ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , StringDeserializer .class );
171+ props .put (ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG , JsonDeserializer .class );
229172
230- pollRecords ( recList , kafkaConsumer );
231- }
173+ try ( KafkaConsumer < String , JsonNode > kafkaConsumer = new KafkaConsumer <>( props )) {
174+ kafkaConsumer . subscribe ( Collections . singletonList ( testTopic ));
232175
233- private void pollRecords (List <Record <Event >> recList , KafkaConsumer <String , JsonNode > kafkaConsumer ) {
234- int recListCounter = 0 ;
235- boolean isPollNext = true ;
236- while (isPollNext ) {
237- ConsumerRecords <String , JsonNode > records = kafkaConsumer .poll (1000 );
238- if (!records .isEmpty () && records .count () > 0 ) {
176+ List <JsonNode > consumed = new ArrayList <>();
177+ await ().atMost (Duration .ofSeconds (30 )).untilAsserted (() -> {
178+ ConsumerRecords <String , JsonNode > records = kafkaConsumer .poll (Duration .ofSeconds (2 ));
239179 for (ConsumerRecord <String , JsonNode > record : records ) {
240- Record <Event > recordEvent = recList .get (recListCounter );
241- String inputJsonStr = recordEvent .getData ().toJsonString ();
242-
243- JsonNode recValue = record .value ();
244- String ss = recValue .asText ();
245-
246- assertThat (ss , CoreMatchers .containsString (inputJsonStr ));
247- if (recListCounter + 1 == recList .size ()) {
248- isPollNext = false ;
249- }
250- recListCounter ++;
251- break ;
180+ consumed .add (record .value ());
252181 }
182+ assertThat (consumed .size (), equalTo (expectedRecords .size ()));
183+ });
184+
185+ for (int i = 0 ; i < expectedRecords .size (); i ++) {
186+ Event expectedEvent = expectedRecords .get (i ).getData ();
187+ JsonNode actual = consumed .get (i );
188+ assertThat (actual .get ("name" ).asText (), equalTo (expectedEvent .get ("name" , String .class )));
189+ assertThat (actual .get ("id" ).asText (), equalTo (expectedEvent .get ("id" , String .class )));
253190 }
254191 }
255192 }
256- }
193+ }
0 commit comments