diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java index 7587c3172b..4f132fd892 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java @@ -73,6 +73,7 @@ public class KafkaBufferIT { private PluginSetting pluginSetting; private KafkaBufferConfig kafkaBufferConfig; + private KafkaBufferConfig kafkaBufferCompressionConfig; @Mock private AcknowledgementSetManager acknowledgementSetManager; @Mock @@ -111,6 +112,13 @@ void setUp() { topicConfig = objectMapper.convertValue(topicConfigMap, BufferTopicConfig.class); + final Map topicConfigMapCompression = Map.of( + "name", topicName, + "group_id", "buffergroup-" + RandomStringUtils.randomAlphabetic(6), + "create_topic", true, + "encryption_key", "6fib8P/ML7Lh7lUEHCFYCt+bschigjNwmEZUctkP5dw=" // sample key + ); + bootstrapServersCommaDelimited = System.getProperty("tests.kafka.bootstrap_servers"); LOG.info("Using Kafka bootstrap servers: {}", bootstrapServersCommaDelimited); @@ -122,6 +130,15 @@ void setUp() { ); kafkaBufferConfig = objectMapper.convertValue(bufferConfigMap, KafkaBufferConfig.class); + final Map bufferCompressionConfigMap = Map.of( + "topics", List.of(topicConfigMapCompression), + "bootstrap_servers", List.of(bootstrapServersCommaDelimited), + "encryption", Map.of("type", "none"), + "producer_properties", Map.of("compression_type", "zstd") + ); + + kafkaBufferCompressionConfig = objectMapper.convertValue(bufferCompressionConfigMap, KafkaBufferConfig.class); + byteDecoder = null; } @@ -133,6 +150,10 @@ private KafkaBuffer createObjectUnderTest() { return new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, null, null, null, encryptionSupplier); } + private KafkaBuffer createObjectUnderTestWithCompression() { + return new KafkaBuffer(pluginSetting, kafkaBufferCompressionConfig, acknowledgementSetManager, null, null, null, encryptionSupplier); + } + @Test void write_and_read() throws TimeoutException { KafkaBuffer objectUnderTest = createObjectUnderTest(); @@ -160,6 +181,33 @@ void write_and_read() throws TimeoutException { assertThat(objectUnderTest.getInnerBufferRecordsInFlight(), equalTo(0)); } + @Test + void write_and_read_compression() throws TimeoutException { + KafkaBuffer objectUnderTest = createObjectUnderTestWithCompression(); + + Record record = createRecord(); + objectUnderTest.write(record, 1_000); + + final Map.Entry>, CheckpointState> readResult = awaitRead(objectUnderTest); + + assertThat(readResult, notNullValue()); + assertThat(readResult.getKey(), notNullValue()); + assertThat(readResult.getKey().size(), equalTo(1)); + + Record onlyResult = readResult.getKey().stream().iterator().next(); + + assertThat(onlyResult, notNullValue()); + assertThat(onlyResult.getData(), notNullValue()); + // TODO: The metadata is not included. It needs to be included in the Buffer, though not in the Sink. This may be something we make configurable in the consumer/producer - whether to serialize the metadata or not. + //assertThat(onlyResult.getData().getMetadata(), equalTo(record.getData().getMetadata())); + assertThat(onlyResult.getData().toMap(), equalTo(record.getData().toMap())); + assertThat(objectUnderTest.getRecordsInFlight(), equalTo(0)); + assertThat(objectUnderTest.getInnerBufferRecordsInFlight(), equalTo(1)); + objectUnderTest.checkpoint(readResult.getValue()); + assertThat(objectUnderTest.getRecordsInFlight(), equalTo(0)); + assertThat(objectUnderTest.getInnerBufferRecordsInFlight(), equalTo(0)); + } + @Test void write_and_read_max_request_test() throws TimeoutException, NoSuchFieldException, IllegalAccessException { KafkaProducerProperties kafkaProducerProperties = new KafkaProducerProperties(); @@ -286,6 +334,32 @@ void writeBytes_and_read() throws Exception { assertThat(onlyResult.getData().toMap(), equalTo(inputDataMap)); } + @Test + void writeBytes_and_read_with_compression() throws Exception { + byteDecoder = new JsonDecoder(); + + final KafkaBuffer objectUnderTest = createObjectUnderTestWithCompression(); + + final Map inputDataMap = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString(), UUID.randomUUID().toString(), UUID.randomUUID().toString()); + final byte[] bytes = objectMapper.writeValueAsBytes(inputDataMap); + final String key = UUID.randomUUID().toString(); + objectUnderTest.writeBytes(bytes, key, 1_000); + + final Map.Entry>, CheckpointState> readResult = awaitRead(objectUnderTest); + + assertThat(readResult, notNullValue()); + assertThat(readResult.getKey(), notNullValue()); + assertThat(readResult.getKey().size(), equalTo(1)); + + Record onlyResult = readResult.getKey().stream().iterator().next(); + + assertThat(onlyResult, notNullValue()); + assertThat(onlyResult.getData(), notNullValue()); + // TODO: The metadata is not included. It needs to be included in the Buffer, though not in the Sink. This may be something we make configurable in the consumer/producer - whether to serialize the metadata or not. + //assertThat(onlyResult.getData().getMetadata(), equalTo(record.getData().getMetadata())); + assertThat(onlyResult.getData().toMap(), equalTo(inputDataMap)); + } + @Test void write_puts_correctly_formatted_data_in_protobuf_wrapper() throws TimeoutException, IOException { final KafkaBuffer objectUnderTest = createObjectUnderTest(); @@ -371,6 +445,31 @@ class Encrypted { @BeforeEach void setUp() throws NoSuchAlgorithmException, InvalidKeyException, NoSuchPaddingException { + random = new Random(); + acknowledgementSetManager = mock(AcknowledgementSetManager.class); + acknowledgementSet = mock(AcknowledgementSet.class); + lenient().doAnswer((a) -> null).when(acknowledgementSet).complete(); + lenient().when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet); + objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); + + when(pluginSetting.getPipelineName()).thenReturn(UUID.randomUUID().toString()); + + topicName = "buffer-" + RandomStringUtils.randomAlphabetic(5); + + Map topicConfigMap = new java.util.HashMap<>(Map.of( + "name", topicName, + "group_id", "buffergroup-" + RandomStringUtils.randomAlphabetic(6), + "create_topic", true + )); + + topicConfig = objectMapper.convertValue(topicConfigMap, BufferTopicConfig.class); + + final Map bufferConfigMap = new java.util.HashMap<>(Map.of( + "topics", List.of(topicConfigMap), + "bootstrap_servers", List.of(bootstrapServersCommaDelimited), + "encryption", Map.of("type", "none") + )); + final KeyGenerator aesKeyGenerator = KeyGenerator.getInstance("AES"); aesKeyGenerator.init(256); final SecretKey secretKey = aesKeyGenerator.generateKey(); @@ -382,11 +481,11 @@ void setUp() throws NoSuchAlgorithmException, InvalidKeyException, NoSuchPadding final byte[] base64Bytes = Base64.getEncoder().encode(secretKey.getEncoded()); aesKey = new String(base64Bytes); - final Map topicConfigMap = objectMapper.convertValue(topicConfig, Map.class); topicConfigMap.put("encryption_key", aesKey); - final Map bufferConfigMap = objectMapper.convertValue(kafkaBufferConfig, Map.class); bufferConfigMap.put("topics", List.of(topicConfigMap)); kafkaBufferConfig = objectMapper.convertValue(bufferConfigMap, KafkaBufferConfig.class); + + byteDecoder = null; } @Test diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java index ff36fc4035..531f7cd58f 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerFactory.java @@ -22,9 +22,9 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.breaker.CircuitBreaker; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.ByteDecoder; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.codec.ByteDecoder; import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.plugins.kafka.common.KafkaDataConfig; import org.opensearch.dataprepper.plugins.kafka.common.KafkaDataConfigAdapter; @@ -33,6 +33,7 @@ import org.opensearch.dataprepper.plugins.kafka.common.key.KeyFactory; import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory; import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; + import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaConsumerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig; @@ -40,6 +41,7 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig; + import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType; import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer; import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicConsumerMetrics; diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java index 66e59e7ac7..4031bb22f8 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaCustomProducerTest.java @@ -28,8 +28,8 @@ import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig; -import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; +import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig; import org.opensearch.dataprepper.plugins.kafka.service.SchemaService; import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink; import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicProducerMetrics; @@ -38,18 +38,20 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.UUID; +import java.util.concurrent.Future; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertArrayEquals; + import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import java.util.concurrent.Future; @ExtendWith(MockitoExtension.class)