Skip to content

Commit 0214745

Browse files
authored
[feat][client] Support configure MessageCrypto in ProducerBuilder (#19939)
Signed-off-by: tison <wander4096@gmail.com>
1 parent 68c10ee commit 0214745

3 files changed

Lines changed: 41 additions & 16 deletions

File tree

pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,17 @@ public interface ProducerBuilder<T> extends Cloneable {
389389
*/
390390
ProducerBuilder<T> defaultCryptoKeyReader(Map<String, String> publicKeys);
391391

392+
/**
393+
* Sets a {@link MessageCrypto}.
394+
*
395+
* <p>Contains methods to encrypt/decrypt messages for end-to-end encryption.
396+
*
397+
* @param messageCrypto
398+
* MessageCrypto object
399+
* @return the producer builder instance
400+
*/
401+
ProducerBuilder<T> messageCrypto(MessageCrypto messageCrypto);
402+
392403
/**
393404
* Add public encryption key, used by producer to encrypt the data key.
394405
*

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.pulsar.client.api.CompressionType;
3535
import org.apache.pulsar.client.api.CryptoKeyReader;
3636
import org.apache.pulsar.client.api.HashingScheme;
37+
import org.apache.pulsar.client.api.MessageCrypto;
3738
import org.apache.pulsar.client.api.MessageRouter;
3839
import org.apache.pulsar.client.api.MessageRoutingMode;
3940
import org.apache.pulsar.client.api.Producer;
@@ -221,6 +222,12 @@ public ProducerBuilder<T> defaultCryptoKeyReader(@NonNull Map<String, String> pu
221222
return cryptoKeyReader(DefaultCryptoKeyReader.builder().publicKeys(publicKeys).build());
222223
}
223224

225+
@Override
226+
public ProducerBuilder<T> messageCrypto(MessageCrypto messageCrypto) {
227+
conf.setMessageCrypto(messageCrypto);
228+
return this;
229+
}
230+
224231
@Override
225232
public ProducerBuilder<T> addEncryptionKey(String key) {
226233
checkArgument(StringUtils.isNotBlank(key), "Encryption key cannot be blank");

pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,15 @@
1818
*/
1919
package org.apache.pulsar.client.impl;
2020

21+
import static org.mockito.Mockito.any;
22+
import static org.mockito.Mockito.eq;
23+
import static org.mockito.Mockito.mock;
24+
import static org.mockito.Mockito.when;
25+
import static org.testng.Assert.assertNotNull;
26+
import java.util.HashMap;
27+
import java.util.Map;
28+
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.TimeUnit;
2130
import org.apache.pulsar.client.api.Message;
2231
import org.apache.pulsar.client.api.MessageRouter;
2332
import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -26,34 +35,24 @@
2635
import org.apache.pulsar.client.api.Schema;
2736
import org.apache.pulsar.client.api.TopicMetadata;
2837
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
38+
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
2939
import org.testng.annotations.BeforeClass;
3040
import org.testng.annotations.Test;
3141

32-
import java.util.HashMap;
33-
import java.util.Map;
34-
import java.util.concurrent.CompletableFuture;
35-
import java.util.concurrent.TimeUnit;
36-
37-
import static org.mockito.Mockito.any;
38-
import static org.mockito.Mockito.eq;
39-
import static org.mockito.Mockito.mock;
40-
import static org.mockito.Mockito.when;
41-
import static org.testng.Assert.assertNotNull;
42-
4342
/**
4443
* Unit tests of {@link ProducerBuilderImpl}.
4544
*/
4645
public class ProducerBuilderImplTest {
4746

4847
private static final String TOPIC_NAME = "testTopicName";
4948
private PulsarClientImpl client;
50-
private ProducerBuilderImpl producerBuilderImpl;
49+
private ProducerBuilderImpl<byte[]> producerBuilderImpl;
5150

5251
@BeforeClass(alwaysRun = true)
5352
public void setup() {
54-
Producer producer = mock(Producer.class);
53+
Producer<?> producer = mock(Producer.class);
5554
client = mock(PulsarClientImpl.class);
56-
producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
55+
producerBuilderImpl = new ProducerBuilderImpl<>(client, Schema.BYTES);
5756
when(client.newProducer()).thenReturn(producerBuilderImpl);
5857

5958
when(client.createProducerAsync(
@@ -66,8 +65,8 @@ public void testProducerBuilderImpl() throws PulsarClientException {
6665
Map<String, String> properties = new HashMap<>();
6766
properties.put("Test-Key2", "Test-Value2");
6867

69-
producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
70-
Producer producer = producerBuilderImpl.topic(TOPIC_NAME)
68+
producerBuilderImpl = new ProducerBuilderImpl<>(client, Schema.BYTES);
69+
Producer<?> producer = producerBuilderImpl.topic(TOPIC_NAME)
7170
.producerName("Test-Producer")
7271
.maxPendingMessages(2)
7372
.addEncryptionKey("Test-EncryptionKey")
@@ -78,6 +77,14 @@ public void testProducerBuilderImpl() throws PulsarClientException {
7877
assertNotNull(producer);
7978
}
8079

80+
@Test
81+
public void testProducerBuilderImplWhenMessageCryptoSet() throws PulsarClientException {
82+
producerBuilderImpl = new ProducerBuilderImpl<>(client, Schema.BYTES);
83+
producerBuilderImpl.topic(TOPIC_NAME).messageCrypto(new MessageCryptoBc("ctx1", true));
84+
assertNotNull(producerBuilderImpl.create());
85+
assertNotNull(producerBuilderImpl.getConf().getMessageCrypto());
86+
}
87+
8188
@Test
8289
public void testProducerBuilderImplWhenMessageRoutingModeAndMessageRouterAreNotSet() throws PulsarClientException {
8390
producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);

0 commit comments

Comments
 (0)