Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class KafkaBufferIT {
private PluginSetting pluginSetting;

private KafkaBufferConfig kafkaBufferConfig;
private KafkaBufferConfig kafkaBufferCompressionConfig;
@Mock
private AcknowledgementSetManager acknowledgementSetManager;
@Mock
Expand Down Expand Up @@ -111,6 +112,13 @@ void setUp() {

topicConfig = objectMapper.convertValue(topicConfigMap, BufferTopicConfig.class);

final Map<String, Object> topicConfigMapCompression = Map.of(
"name", topicName,
"group_id", "buffergroup-" + RandomStringUtils.randomAlphabetic(6),
"create_topic", true,
"encryption_key", "6fib8P/ML7Lh7lUEHCFYCt+bschigjNwmEZUctkP5dw=" // sample key
);

bootstrapServersCommaDelimited = System.getProperty("tests.kafka.bootstrap_servers");

LOG.info("Using Kafka bootstrap servers: {}", bootstrapServersCommaDelimited);
Expand All @@ -122,6 +130,15 @@ void setUp() {
);
kafkaBufferConfig = objectMapper.convertValue(bufferConfigMap, KafkaBufferConfig.class);

final Map<String, Object> bufferCompressionConfigMap = Map.of(
"topics", List.of(topicConfigMapCompression),
"bootstrap_servers", List.of(bootstrapServersCommaDelimited),
"encryption", Map.of("type", "none"),
"producer_properties", Map.of("compression_type", "zstd")
);

kafkaBufferCompressionConfig = objectMapper.convertValue(bufferCompressionConfigMap, KafkaBufferConfig.class);

byteDecoder = null;
}

Expand All @@ -133,6 +150,10 @@ private KafkaBuffer createObjectUnderTest() {
return new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, null, null, null, encryptionSupplier);
}

private KafkaBuffer createObjectUnderTestWithCompression() {
return new KafkaBuffer(pluginSetting, kafkaBufferCompressionConfig, acknowledgementSetManager, null, null, null, encryptionSupplier);
}

@Test
void write_and_read() throws TimeoutException {
KafkaBuffer objectUnderTest = createObjectUnderTest();
Expand Down Expand Up @@ -160,6 +181,33 @@ void write_and_read() throws TimeoutException {
assertThat(objectUnderTest.getInnerBufferRecordsInFlight(), equalTo(0));
}

@Test
void write_and_read_compression() throws TimeoutException {
KafkaBuffer objectUnderTest = createObjectUnderTestWithCompression();

Record<Event> record = createRecord();
objectUnderTest.write(record, 1_000);

final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = awaitRead(objectUnderTest);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
assertThat(readResult.getKey().size(), equalTo(1));

Record<Event> onlyResult = readResult.getKey().stream().iterator().next();

assertThat(onlyResult, notNullValue());
assertThat(onlyResult.getData(), notNullValue());
// TODO: The metadata is not included. It needs to be included in the Buffer, though not in the Sink. This may be something we make configurable in the consumer/producer - whether to serialize the metadata or not.
//assertThat(onlyResult.getData().getMetadata(), equalTo(record.getData().getMetadata()));
assertThat(onlyResult.getData().toMap(), equalTo(record.getData().toMap()));
assertThat(objectUnderTest.getRecordsInFlight(), equalTo(0));
assertThat(objectUnderTest.getInnerBufferRecordsInFlight(), equalTo(1));
objectUnderTest.checkpoint(readResult.getValue());
assertThat(objectUnderTest.getRecordsInFlight(), equalTo(0));
assertThat(objectUnderTest.getInnerBufferRecordsInFlight(), equalTo(0));
}

@Test
void write_and_read_max_request_test() throws TimeoutException, NoSuchFieldException, IllegalAccessException {
KafkaProducerProperties kafkaProducerProperties = new KafkaProducerProperties();
Expand Down Expand Up @@ -286,6 +334,32 @@ void writeBytes_and_read() throws Exception {
assertThat(onlyResult.getData().toMap(), equalTo(inputDataMap));
}

@Test
void writeBytes_and_read_with_compression() throws Exception {
byteDecoder = new JsonDecoder();

final KafkaBuffer objectUnderTest = createObjectUnderTestWithCompression();

final Map<String, String> inputDataMap = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString(), UUID.randomUUID().toString(), UUID.randomUUID().toString());
final byte[] bytes = objectMapper.writeValueAsBytes(inputDataMap);
final String key = UUID.randomUUID().toString();
objectUnderTest.writeBytes(bytes, key, 1_000);

final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = awaitRead(objectUnderTest);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
assertThat(readResult.getKey().size(), equalTo(1));

Record<Event> onlyResult = readResult.getKey().stream().iterator().next();

assertThat(onlyResult, notNullValue());
assertThat(onlyResult.getData(), notNullValue());
// TODO: The metadata is not included. It needs to be included in the Buffer, though not in the Sink. This may be something we make configurable in the consumer/producer - whether to serialize the metadata or not.
//assertThat(onlyResult.getData().getMetadata(), equalTo(record.getData().getMetadata()));
assertThat(onlyResult.getData().toMap(), equalTo(inputDataMap));
}

@Test
void write_puts_correctly_formatted_data_in_protobuf_wrapper() throws TimeoutException, IOException {
final KafkaBuffer objectUnderTest = createObjectUnderTest();
Expand Down Expand Up @@ -371,6 +445,31 @@ class Encrypted {

@BeforeEach
void setUp() throws NoSuchAlgorithmException, InvalidKeyException, NoSuchPaddingException {
random = new Random();
acknowledgementSetManager = mock(AcknowledgementSetManager.class);
acknowledgementSet = mock(AcknowledgementSet.class);
lenient().doAnswer((a) -> null).when(acknowledgementSet).complete();
lenient().when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet);
objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());

when(pluginSetting.getPipelineName()).thenReturn(UUID.randomUUID().toString());

topicName = "buffer-" + RandomStringUtils.randomAlphabetic(5);

Map<String, Object> topicConfigMap = new java.util.HashMap<>(Map.of(
"name", topicName,
"group_id", "buffergroup-" + RandomStringUtils.randomAlphabetic(6),
"create_topic", true
));

topicConfig = objectMapper.convertValue(topicConfigMap, BufferTopicConfig.class);

final Map<String, Object> bufferConfigMap = new java.util.HashMap<>(Map.of(
"topics", List.of(topicConfigMap),
"bootstrap_servers", List.of(bootstrapServersCommaDelimited),
"encryption", Map.of("type", "none")
));

final KeyGenerator aesKeyGenerator = KeyGenerator.getInstance("AES");
aesKeyGenerator.init(256);
final SecretKey secretKey = aesKeyGenerator.generateKey();
Expand All @@ -382,11 +481,11 @@ void setUp() throws NoSuchAlgorithmException, InvalidKeyException, NoSuchPadding
final byte[] base64Bytes = Base64.getEncoder().encode(secretKey.getEncoded());
aesKey = new String(base64Bytes);

final Map<String, Object> topicConfigMap = objectMapper.convertValue(topicConfig, Map.class);
topicConfigMap.put("encryption_key", aesKey);
final Map<String, Object> bufferConfigMap = objectMapper.convertValue(kafkaBufferConfig, Map.class);
bufferConfigMap.put("topics", List.of(topicConfigMap));
kafkaBufferConfig = objectMapper.convertValue(bufferConfigMap, KafkaBufferConfig.class);

byteDecoder = null;
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.breaker.CircuitBreaker;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.kafka.common.KafkaDataConfig;
import org.opensearch.dataprepper.plugins.kafka.common.KafkaDataConfigAdapter;
Expand All @@ -33,13 +33,15 @@
import org.opensearch.dataprepper.plugins.kafka.common.key.KeyFactory;
import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory;
import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig;

import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaConsumerConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.PlainTextAuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;

import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicConsumerMetrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.service.SchemaService;
import org.opensearch.dataprepper.plugins.kafka.sink.DLQSink;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicProducerMetrics;
Expand All @@ -38,18 +38,20 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.UUID;
import java.util.concurrent.Future;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import java.util.concurrent.Future;


@ExtendWith(MockitoExtension.class)
Expand Down
Loading