Skip to content

Commit 540f3a1

Browse files
authored
FEAT: encryption extension integration with kafka (#5625)
* FEAT: encryption extension integration with kafka buffer Signed-off-by: George Chen <qchea@amazon.com>
1 parent 60ce980 commit 540f3a1

21 files changed

Lines changed: 505 additions & 14 deletions

File tree

data-prepper-plugins/encryption-plugin/src/main/java/org/opensearch/dataprepper/plugins/encryption/EncryptionPlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import java.util.concurrent.TimeUnit;
2323
import java.util.stream.Collectors;
2424

25-
@DataPrepperExtensionPlugin(modelType = EncryptionPluginConfig.class, rootKeyJsonPath = "/encryption")
25+
@DataPrepperExtensionPlugin(modelType = EncryptionPluginConfig.class, rootKeyJsonPath = "/encryption", allowInPipelineConfigurations = true)
2626
public class EncryptionPlugin implements ExtensionPlugin {
2727
static final int PERIOD_IN_SECONDS = 60;
2828
private static final Logger LOG = LoggerFactory.getLogger(EncryptionPlugin.class);

data-prepper-plugins/kafka-plugins/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ dependencies {
2929
implementation project(':data-prepper-plugins:buffer-common')
3030
implementation project(':data-prepper-plugins:blocking-buffer')
3131
implementation project(':data-prepper-plugins:aws-plugin-api')
32+
implementation project(':data-prepper-plugins:encryption-plugin')
3233
// bump io.confluent:* dependencies correspondingly when bumping org.apache.kafka.*
3334
// https://docs.confluent.io/platform/current/release-notes/index.html
3435
implementation 'org.apache.kafka:kafka-clients:3.6.1'

data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferIT.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.opensearch.dataprepper.model.event.Event;
2727
import org.opensearch.dataprepper.model.event.JacksonEvent;
2828
import org.opensearch.dataprepper.model.record.Record;
29+
import org.opensearch.dataprepper.plugins.encryption.EncryptionSupplier;
2930
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties;
3031
import org.opensearch.dataprepper.plugins.kafka.util.TestConsumer;
3132
import org.opensearch.dataprepper.plugins.kafka.util.TestProducer;
@@ -76,6 +77,8 @@ public class KafkaBufferIT {
7677
private AcknowledgementSetManager acknowledgementSetManager;
7778
@Mock
7879
private AcknowledgementSet acknowledgementSet;
80+
@Mock
81+
private EncryptionSupplier encryptionSupplier;
7982

8083
private Random random;
8184

@@ -123,11 +126,11 @@ void setUp() {
123126
}
124127

125128
private KafkaBuffer createObjectUnderTestWithJsonDecoder() {
126-
return new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, new JsonDecoder(), null, null);
129+
return new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, new JsonDecoder(), null, null, encryptionSupplier);
127130
}
128131

129132
private KafkaBuffer createObjectUnderTest() {
130-
return new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, null, null, null);
133+
return new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, null, null, null, encryptionSupplier);
131134
}
132135

133136
@Test

data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferOTelIT.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.opensearch.dataprepper.model.metric.JacksonHistogram;
2929
import org.opensearch.dataprepper.model.log.OpenTelemetryLog;
3030
import org.opensearch.dataprepper.model.trace.Span;
31+
import org.opensearch.dataprepper.plugins.encryption.EncryptionSupplier;
3132
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig;
3233
import org.opensearch.dataprepper.model.buffer.DelegatingBuffer;
3334
import org.opensearch.dataprepper.model.buffer.Buffer;
@@ -93,6 +94,8 @@ public class KafkaBufferOTelIT {
9394
private AcknowledgementSetManager acknowledgementSetManager;
9495
@Mock
9596
private BufferTopicConfig topicConfig;
97+
@Mock
98+
private EncryptionSupplier encryptionSupplier;
9699

97100
private DelegatingBuffer buffer;
98101

@@ -294,7 +297,7 @@ private void validateMetric(Event event) {
294297

295298
@Test
296299
void test_otel_metrics_with_kafka_buffer() throws Exception {
297-
KafkaBuffer kafkaBuffer = new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, new OTelMetricDecoder(OTelOutputFormat.OPENSEARCH), null, null);
300+
KafkaBuffer kafkaBuffer = new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, new OTelMetricDecoder(OTelOutputFormat.OPENSEARCH), null, null, encryptionSupplier);
298301
buffer = new KafkaDelegatingBuffer(kafkaBuffer);
299302
final ExportMetricsServiceRequest request = createExportMetricsServiceRequest();
300303
buffer.writeBytes(request.toByteArray(), null, 10_000);
@@ -366,7 +369,7 @@ private void validateLog(OpenTelemetryLog logRecord) throws Exception {
366369

367370
@Test
368371
void test_otel_logs_with_kafka_buffer() throws Exception {
369-
KafkaBuffer kafkaBuffer = new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, new OTelLogsDecoder(OTelOutputFormat.OPENSEARCH), null, null);
372+
KafkaBuffer kafkaBuffer = new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, new OTelLogsDecoder(OTelOutputFormat.OPENSEARCH), null, null, encryptionSupplier);
370373
buffer = new KafkaDelegatingBuffer(kafkaBuffer);
371374
final ExportLogsServiceRequest request = createExportLogsRequest();
372375
buffer.writeBytes(request.toByteArray(), null, 10_000);
@@ -437,7 +440,7 @@ private void validateSpan(Span span) throws Exception {
437440

438441
@Test
439442
void test_otel_traces_with_kafka_buffer() throws Exception {
440-
KafkaBuffer kafkaBuffer = new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, new OTelTraceDecoder(OTelOutputFormat.OPENSEARCH), null, null);
443+
KafkaBuffer kafkaBuffer = new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, new OTelTraceDecoder(OTelOutputFormat.OPENSEARCH), null, null, encryptionSupplier);
441444
buffer = new KafkaDelegatingBuffer(kafkaBuffer);
442445
final ExportTraceServiceRequest request = createExportTraceRequest();
443446
buffer.writeBytes(request.toByteArray(), null, 10_000);

data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer_KmsIT.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.opensearch.dataprepper.model.event.JacksonEvent;
2727
import org.opensearch.dataprepper.model.plugin.PluginFactory;
2828
import org.opensearch.dataprepper.model.record.Record;
29+
import org.opensearch.dataprepper.plugins.encryption.EncryptionSupplier;
2930
import org.opensearch.dataprepper.plugins.kafka.util.TestProducer;
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
@@ -78,6 +79,9 @@ public class KafkaBuffer_KmsIT {
7879
@Mock
7980
private AwsCredentialsSupplier awsCredentialsSupplier;
8081

82+
@Mock
83+
private EncryptionSupplier encryptionSupplier;
84+
8185
private Random random;
8286

8387
private BufferTopicConfig topicConfig;
@@ -132,7 +136,7 @@ void setUp() {
132136
}
133137

134138
private KafkaBuffer createObjectUnderTest() {
135-
return new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, null, awsCredentialsSupplier, null);
139+
return new KafkaBuffer(pluginSetting, kafkaBufferConfig, acknowledgementSetManager, null, awsCredentialsSupplier, null, encryptionSupplier);
136140
}
137141

138142
@Nested

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/BufferTopicConfig.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.fasterxml.jackson.annotation.JsonIgnore;
99
import com.fasterxml.jackson.annotation.JsonProperty;
1010
import jakarta.validation.Valid;
11+
import jakarta.validation.constraints.AssertTrue;
1112
import jakarta.validation.constraints.Size;
1213
import org.opensearch.dataprepper.model.types.ByteCount;
1314
import org.opensearch.dataprepper.plugins.kafka.configuration.CommonTopicConfig;
@@ -37,6 +38,9 @@ class BufferTopicConfig extends CommonTopicConfig implements TopicProducerConfig
3738
static final Integer DEFAULT_NUM_OF_WORKERS = 2;
3839
static final Duration DEFAULT_HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(5);
3940

41+
@JsonProperty("encryption_id")
42+
private String encryptionId;
43+
4044
@JsonProperty("encryption_key")
4145
private String encryptionKey;
4246

@@ -119,6 +123,11 @@ public MessageFormat getSerdeFormat() {
119123
return MessageFormat.BYTES;
120124
}
121125

126+
@Override
127+
public String getEncryptionId() {
128+
return encryptionId;
129+
}
130+
122131
@Override
123132
public String getEncryptionKey() {
124133
return encryptionKey;
@@ -239,4 +248,13 @@ public long getFetchMinBytes() {
239248
public long getMaxPartitionFetchBytes() {
240249
return maxPartitionFetchBytes.getBytes();
241250
}
251+
252+
@AssertTrue(message = "Either encryption_id or encryption_key together with kms_config must be specified, " +
253+
"and only one of them can be specified")
254+
public boolean IsEncryptionAtRestSettingValid() {
255+
if (encryptionId != null && (encryptionKey != null || kmsConfig != null)) {
256+
return false;
257+
}
258+
return true;
259+
}
242260
}

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
2323
import org.apache.kafka.common.errors.RecordTooLargeException;
2424
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
25+
import org.opensearch.dataprepper.plugins.encryption.EncryptionSupplier;
2526
import org.opensearch.dataprepper.plugins.kafka.admin.KafkaAdminAccessor;
2627
import org.opensearch.dataprepper.plugins.kafka.buffer.serialization.BufferSerializationFactory;
2728
import org.opensearch.dataprepper.plugins.kafka.common.KafkaMdc;
@@ -73,9 +74,10 @@ public class KafkaBuffer extends AbstractBuffer<Record<Event>> {
7374
public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig kafkaBufferConfig,
7475
final AcknowledgementSetManager acknowledgementSetManager,
7576
final ByteDecoder byteDecoder, final AwsCredentialsSupplier awsCredentialsSupplier,
76-
final CircuitBreaker circuitBreaker) {
77+
final CircuitBreaker circuitBreaker,
78+
final EncryptionSupplier encryptionSupplier) {
7779
super(kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName()+"buffer"), pluginSetting.getPipelineName());
78-
final SerializationFactory serializationFactory = new BufferSerializationFactory(new CommonSerializationFactory());
80+
final SerializationFactory serializationFactory = new BufferSerializationFactory(new CommonSerializationFactory(), encryptionSupplier);
7981
final KafkaCustomProducerFactory kafkaCustomProducerFactory = new KafkaCustomProducerFactory(serializationFactory, awsCredentialsSupplier, new TopicServiceFactory());
8082
this.byteDecoder = byteDecoder;
8183
final String metricPrefixName = kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.kafka.buffer.serialization;
7+
8+
import com.google.protobuf.InvalidProtocolBufferException;
9+
import org.apache.kafka.common.serialization.Deserializer;
10+
import org.opensearch.dataprepper.model.encryption.EncryptionEngine;
11+
import org.opensearch.dataprepper.plugins.encryption.DefaultEncryptionEnvelope;
12+
import org.opensearch.dataprepper.plugins.kafka.buffer.KafkaBufferMessage;
13+
14+
/**
15+
* A Kafka {@link Deserializer} which deserializes data from a KafkaBuffer Protobuf message.
16+
*
17+
* @param <T> The output type
18+
*/
19+
class BufferMessageEncryptionDeserializer<T> implements Deserializer<T> {
20+
private final Deserializer<T> dataDeserializer;
21+
private final EncryptionEngine encryptionEngine;
22+
23+
public BufferMessageEncryptionDeserializer(final Deserializer<T> dataDeserializer,
24+
final EncryptionEngine encryptionEngine) {
25+
this.dataDeserializer = dataDeserializer;
26+
this.encryptionEngine = encryptionEngine;
27+
}
28+
29+
@Override
30+
public T deserialize(final String topic, final byte[] data) {
31+
final KafkaBufferMessage.BufferData bufferedData;
32+
try {
33+
bufferedData = KafkaBufferMessage.BufferData.parseFrom(data);
34+
} catch (final InvalidProtocolBufferException e) {
35+
throw new RuntimeException(e);
36+
}
37+
38+
final byte[] dataBytes = encryptionEngine.decrypt(
39+
DefaultEncryptionEnvelope.builder()
40+
.encryptedDataKey(bufferedData.getEncryptedDataKey().toStringUtf8())
41+
.encryptedData(bufferedData.getData().toByteArray())
42+
.build());
43+
44+
return dataDeserializer.deserialize(topic, dataBytes);
45+
}
46+
47+
Deserializer<T> getDataDeserializer() {
48+
return dataDeserializer;
49+
}
50+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.kafka.buffer.serialization;
7+
8+
import com.google.protobuf.ByteString;
9+
import org.apache.kafka.common.serialization.Serializer;
10+
import org.opensearch.dataprepper.model.encryption.EncryptionEngine;
11+
import org.opensearch.dataprepper.model.encryption.EncryptionEnvelope;
12+
import org.opensearch.dataprepper.plugins.kafka.buffer.KafkaBufferMessage;
13+
14+
import java.util.Objects;
15+
16+
/**
17+
* A Kafka {@link Serializer} which serializes data into a KafkaBuffer Protobuf message.
18+
*
19+
* @param <T> The input type
20+
*/
21+
class BufferMessageEncryptionSerializer<T> implements Serializer<T> {
22+
private final Serializer<T> dataSerializer;
23+
private final EncryptionEngine encryptionEngine;
24+
25+
public BufferMessageEncryptionSerializer(final Serializer<T> dataSerializer,
26+
final EncryptionEngine encryptionEngine) {
27+
this.dataSerializer = Objects.requireNonNull(dataSerializer);
28+
this.encryptionEngine = Objects.requireNonNull(encryptionEngine);
29+
}
30+
31+
@Override
32+
public byte[] serialize(final String topic, final T data) {
33+
if (data == null)
34+
return null;
35+
36+
final byte[] serializedData = dataSerializer.serialize(topic, data);
37+
final EncryptionEnvelope encryptionEnvelope = encryptionEngine.encrypt(serializedData);
38+
39+
final KafkaBufferMessage.BufferData bufferedData = buildProtobufMessage(encryptionEnvelope);
40+
41+
return bufferedData.toByteArray();
42+
}
43+
44+
Serializer<T> getDataSerializer() {
45+
return dataSerializer;
46+
}
47+
48+
private KafkaBufferMessage.BufferData buildProtobufMessage(final EncryptionEnvelope encryptionEnvelope) {
49+
return KafkaBufferMessage.BufferData.newBuilder()
50+
.setEncryptedDataKey(ByteString.copyFromUtf8(encryptionEnvelope.getEncryptedDataKey()))
51+
.setData(ByteString.copyFrom(encryptionEnvelope.getEncryptedData()))
52+
.setEncrypted(true)
53+
.setMessageFormat(KafkaBufferMessage.MessageFormat.MESSAGE_FORMAT_BYTES)
54+
.build();
55+
}
56+
}

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/serialization/BufferSerializationFactory.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,33 +7,60 @@
77

88
import org.apache.kafka.common.serialization.Deserializer;
99
import org.apache.kafka.common.serialization.Serializer;
10+
import org.opensearch.dataprepper.model.encryption.EncryptionEngine;
11+
import org.opensearch.dataprepper.plugins.encryption.EncryptionSupplier;
1012
import org.opensearch.dataprepper.plugins.kafka.common.KafkaDataConfig;
1113
import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory;
1214

15+
import java.util.Objects;
16+
1317
/**
1418
* An implementation of {@link SerializationFactory} specifically for the Kafka buffer.
1519
* It makes use the Kafka buffer's Protobuf wrapper.
1620
*/
1721
public class BufferSerializationFactory implements SerializationFactory {
1822
private final SerializationFactory innerSerializationFactory;
23+
private final EncryptionSupplier encryptionSupplier;
1924

2025
/**
2126
* Create a new instance using an inner {@link SerializationFactory}. You typically pass in
2227
* the {@link org.opensearch.dataprepper.plugins.kafka.common.serialization.CommonSerializationFactory}.
2328
*
2429
* @param innerSerializationFactory The serialization factory for the inner data.
2530
*/
26-
public BufferSerializationFactory(final SerializationFactory innerSerializationFactory) {
31+
public BufferSerializationFactory(final SerializationFactory innerSerializationFactory,
32+
final EncryptionSupplier encryptionSupplier) {
2733
this.innerSerializationFactory = innerSerializationFactory;
34+
this.encryptionSupplier = encryptionSupplier;
2835
}
2936

3037
@Override
3138
public Deserializer<?> getDeserializer(final KafkaDataConfig dataConfig) {
32-
return new BufferMessageDeserializer<>(innerSerializationFactory.getDeserializer(dataConfig));
39+
final String encryptionId = dataConfig.getEncryptionId();
40+
final Deserializer<?> dataDeserializer = innerSerializationFactory.getDeserializer(dataConfig);
41+
if (Objects.isNull(encryptionId)) {
42+
return new BufferMessageDeserializer<>(dataDeserializer);
43+
} else {
44+
final EncryptionEngine encryptionEngine = encryptionSupplier.getEncryptionEngine(encryptionId);
45+
if (encryptionEngine == null) {
46+
throw new IllegalArgumentException("Encryption engine not found for encryption_id: " + encryptionId);
47+
}
48+
return new BufferMessageEncryptionDeserializer<>(dataDeserializer, encryptionEngine);
49+
}
3350
}
3451

3552
@Override
3653
public Serializer<?> getSerializer(final KafkaDataConfig dataConfig) {
37-
return new BufferMessageSerializer<>(innerSerializationFactory.getSerializer(dataConfig), dataConfig);
54+
final String encryptionId = dataConfig.getEncryptionId();
55+
final Serializer<?> dataSerializer = innerSerializationFactory.getSerializer(dataConfig);
56+
if (Objects.isNull(encryptionId)) {
57+
return new BufferMessageSerializer<>(innerSerializationFactory.getSerializer(dataConfig), dataConfig);
58+
} else {
59+
final EncryptionEngine encryptionEngine = encryptionSupplier.getEncryptionEngine(encryptionId);
60+
if (encryptionEngine == null) {
61+
throw new IllegalArgumentException("Encryption engine not found for encryption_id: " + encryptionId);
62+
}
63+
return new BufferMessageEncryptionSerializer<>(dataSerializer, encryptionEngine);
64+
}
3865
}
3966
}

0 commit comments

Comments
 (0)