Skip to content

Commit 994b1be

Browse files
[improve][pip] PIP-432: Add isEncrypted field to EncryptionContext (apache#24481)
1 parent bc5b933 commit 994b1be

7 files changed

Lines changed: 179 additions & 12 deletions

File tree

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# PIP-432: Add isEncrypted field to EncryptionContext
2+
3+
# Background knowledge
4+
5+
Apache Pulsar supports client-side encryption where messages can be encrypted by producers and decrypted by consumers. When a message is encrypted, Pulsar includes an `EncryptionContext` with each message that contains encryption metadata such as:
6+
7+
- **Encryption keys**: The encrypted data encryption keys used for message encryption
8+
- **Encryption parameters**: Additional parameters like initialization vectors (IV)
9+
- **Encryption algorithm**: The algorithm used (e.g., RSA, ECDSA)
10+
- **Compression information**: Whether compression was applied before encryption
11+
12+
**Key concepts:**
13+
- **EncryptionContext**: A metadata object attached to encrypted messages containing encryption-related information
14+
- **CryptoKeyReader**: An interface that provides public/private keys for encryption/decryption operations
15+
- **ConsumerCryptoFailureAction**: Determines how consumers handle decryption failures:
16+
- `FAIL`: Fail message consumption (default)
17+
- `DISCARD`: Silently discard the message
18+
- `CONSUME`: Deliver the encrypted message to the application
19+
20+
Currently, when `ConsumerCryptoFailureAction.CONSUME` is configured, consumers can receive encrypted messages even when decryption fails (e.g., missing private key, mismatched keys). However, applications have no way to determine whether the received message was successfully decrypted or is still encrypted.
21+
22+
# Motivation
23+
24+
Applications using Pulsar's encryption feature with `ConsumerCryptoFailureAction.CONSUME` need to determine whether received messages were successfully decrypted or if decryption failed. This is essential for:
25+
26+
1. **Error handling**: Applications need to know when they receive encrypted (undecrypted) data to handle it appropriately
27+
2. **Monitoring**: Applications want to track decryption success/failure rates for monitoring and alerting
28+
3. **Manual decryption**: When automatic decryption fails, applications may want to attempt manual decryption using the EncryptionContext
29+
4. **Security compliance**: Applications need to ensure they're not inadvertently processing encrypted data as plain text
30+
31+
**Current situation:**
32+
- Consumers with `CONSUME` action receive messages regardless of decryption success
33+
- No programmatic way to distinguish between successfully decrypted and failed decryption messages
34+
- Applications must implement workarounds to detect encrypted vs. decrypted content
35+
36+
**Use cases this solves:**
37+
1. Consumer without private key configured → should know decryption failed
38+
2. Consumer with mismatched private key → should know decryption failed
39+
3. Consumer with correct private key → should know decryption succeeded
40+
41+
# Goals
42+
43+
## In Scope
44+
45+
- Add an `isEncrypted` boolean field to the `EncryptionContext` class
46+
- Update consumer decryption logic to populate this field correctly
47+
- Ensure the field accurately reflects decryption status for all encryption scenarios
48+
- Maintain backward compatibility with existing applications
49+
- Update existing encryption tests to verify the new functionality
50+
51+
## Out of Scope
52+
53+
- Changes to encryption/decryption algorithms or protocols
54+
- Modifications to `ConsumerCryptoFailureAction` behavior
55+
- Performance improvements to encryption/decryption operations
56+
- New encryption features or capabilities
57+
- Changes to producer-side encryption logic
58+
59+
# High Level Design
60+
61+
The solution adds a simple boolean field `isEncrypted` to the existing `EncryptionContext` class. This field is set during message processing in the consumer:
62+
63+
1. **Successful decryption**: When a consumer successfully decrypts a message, `isEncrypted` is set to `false`
64+
2. **Failed decryption**: When decryption fails (missing key, wrong key, etc.) but `ConsumerCryptoFailureAction.CONSUME` is configured, `isEncrypted` is set to `true`
65+
3. **No encryption**: For non-encrypted messages, no `EncryptionContext` is created (existing behavior)
66+
67+
The field is populated in the consumer's message creation logic, specifically in the `createEncryptionContext()` method where the decryption success/failure status is already known.
68+
69+
Applications can then check this field to determine if the received message payload is encrypted or decrypted:
70+
71+
```java
72+
Message<byte[]> message = consumer.receive();
73+
Optional<EncryptionContext> ctx = message.getEncryptionCtx();
74+
if (ctx.isPresent()) {
75+
if (ctx.get().isEncrypted()) {
76+
// Handle encrypted message - decryption failed
77+
handleEncryptedMessage(message, ctx.get());
78+
} else {
79+
// Handle decrypted message - decryption succeeded
80+
handleDecryptedMessage(message);
81+
}
82+
}
83+
```
84+
85+
## Public-facing Changes
86+
87+
### Public API
88+
89+
**New method available via Lombok-generated getter:**
90+
```java
91+
public boolean isEncrypted()
92+
```
93+
94+
**Usage pattern:**
95+
```java
96+
Message<T> message = consumer.receive();
97+
Optional<EncryptionContext> encryptionCtx = message.getEncryptionCtx();
98+
if (encryptionCtx.isPresent()) {
99+
boolean encrypted = encryptionCtx.get().isEncrypted();
100+
if (encrypted) {
101+
// Message is encrypted (decryption failed)
102+
} else {
103+
// Message is decrypted (decryption succeeded)
104+
}
105+
}
106+
```
107+
108+
**Breaking Changes:** None - this is a purely additive change.
109+
110+
# Backward & Forward Compatibility
111+
112+
No compatibility concerns.
113+
114+
## Downgrade / Rollback
115+
116+
Rolling back to a previous Pulsar version:
117+
1. Applications using `isEncrypted()` will get compilation errors - this is expected
118+
2. Remove calls to `isEncrypted()` from application code
119+
3. Downgrade Pulsar client library
120+
4. No data loss or corruption risk

pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3297,6 +3297,11 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
32973297
String expectedMessage = "my-message-" + msgNum++;
32983298
Assert.assertNotEquals(receivedMessage, expectedMessage, "Received encrypted message " + receivedMessage
32993299
+ " should not match the expected message " + expectedMessage);
3300+
// Verify that decryption failed since no CryptoKeyReader is configured
3301+
Optional<EncryptionContext> encryptionCtx = msg.getEncryptionCtx();
3302+
Assert.assertTrue(encryptionCtx.isPresent(), "EncryptionContext should be present for encrypted message");
3303+
Assert.assertTrue(encryptionCtx.get().isEncrypted(),
3304+
"isEncrypted should be true when no CryptoKeyReader is configured");
33003305
consumer.acknowledgeCumulative(msg);
33013306
} catch (Exception e) {
33023307
e.printStackTrace();
@@ -3313,8 +3318,12 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
33133318
for (int i = msgNum; i < totalMsg - 1; i++) {
33143319
msg = (MessageImpl<byte[]>) consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
33153320
// verify that encrypted message contains encryption-context
3316-
msg.getEncryptionCtx()
3321+
Optional<EncryptionContext> encryptionCtx = msg.getEncryptionCtx();
3322+
EncryptionContext ctx = encryptionCtx
33173323
.orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message"));
3324+
// Verify that decryption succeeded since CryptoKeyReader is properly configured
3325+
Assert.assertFalse(ctx.isEncrypted(),
3326+
"isEncrypted should be false when CryptoKeyReader is properly configured and decryption succeeds");
33183327
String receivedMessage = new String(msg.getData());
33193328
log.debug("Received message: [{}]", receivedMessage);
33203329
String expectedMessage = "my-message-" + i;
@@ -3399,6 +3408,12 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
33993408
TopicMessageImpl<byte[]> msg =
34003409
(TopicMessageImpl<byte[]>) consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
34013410

3411+
// Verify that decryption failed since no CryptoKeyReader is configured for the consumer
3412+
Optional<EncryptionContext> encryptionCtx = msg.getEncryptionCtx();
3413+
Assert.assertTrue(encryptionCtx.isPresent(), "EncryptionContext should be present for encrypted message");
3414+
Assert.assertTrue(encryptionCtx.get().isEncrypted(),
3415+
"isEncrypted should be true when consumer has no CryptoKeyReader configured");
3416+
34023417
String receivedMessage = decryptMessage(msg, encryptionKeyName, new EncKeyReader());
34033418
assertEquals(message, receivedMessage);
34043419

@@ -5250,6 +5265,15 @@ public void testE2EEncryptionWithCompression() throws Exception {
52505265
for (int i = 0; i < 10; i++) {
52515266
final var msg = consumer.receive(5, TimeUnit.SECONDS);
52525267
assertNotNull(msg);
5268+
// Verify that decryption failed due to mismatched keys
5269+
if (msg instanceof MessageImpl) {
5270+
MessageImpl<String> msgImpl = (MessageImpl<String>) msg;
5271+
Optional<EncryptionContext> encryptionCtx = msgImpl.getEncryptionCtx();
5272+
Assert.assertTrue(encryptionCtx.isPresent(),
5273+
"EncryptionContext should be present for encrypted message");
5274+
Assert.assertTrue(encryptionCtx.get().isEncrypted(),
5275+
"isEncrypted should be true when using mismatched keys for decryption");
5276+
}
52535277
consumer.acknowledge(msg);
52545278
}
52555279

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void testCompactedOutMessages() throws Exception {
8282
// shove it in the sideways
8383
consumer.receiveIndividualMessagesFromBatch(brokerEntryMetadata, metadata, 0, null,
8484
batchBuffer, new MessageIdData().setLedgerId(1234).setEntryId(567),
85-
consumer.cnx(), DEFAULT_CONSUMER_EPOCH);
85+
consumer.cnx(), DEFAULT_CONSUMER_EPOCH, false);
8686
Message<?> m = consumer.receive();
8787
assertEquals(((BatchMessageIdImpl) m.getMessageId()).getLedgerId(), 1234);
8888
assertEquals(((BatchMessageIdImpl) m.getMessageId()).getEntryId(), 567);

pulsar-client-api/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public class EncryptionContext {
3939
private CompressionType compressionType;
4040
private int uncompressedMessageSize;
4141
private Optional<Integer> batchSize;
42+
// Indicates whether the message payload remains encrypted (true) or has been successfully decrypted (false)
43+
private boolean isEncrypted;
4244

4345
/**
4446
* Encryption key with metadata.

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1290,7 +1290,8 @@ protected <V> MessageImpl<V> newSingleMessage(final int index,
12901290
final BitSetRecyclable ackBitSet,
12911291
final BitSet ackSetInMessageId,
12921292
final int redeliveryCount,
1293-
final long consumerEpoch) {
1293+
final long consumerEpoch,
1294+
final boolean isEncrypted) {
12941295
if (log.isDebugEnabled()) {
12951296
log.debug("[{}] [{}] processing message num - {} in batch", subscription, consumerName, index);
12961297
}
@@ -1328,7 +1329,8 @@ protected <V> MessageImpl<V> newSingleMessage(final int index,
13281329
final ByteBuf payloadBuffer = (singleMessagePayload != null) ? singleMessagePayload : payload;
13291330
final MessageImpl<V> message = MessageImpl.create(topicName.toString(), batchMessageIdImpl,
13301331
msgMetadata, singleMessageMetadata, payloadBuffer,
1331-
createEncryptionContext(msgMetadata), cnx(), schema, redeliveryCount, poolMessages, consumerEpoch);
1332+
createEncryptionContext(msgMetadata, isEncrypted), cnx(), schema, redeliveryCount,
1333+
poolMessages, consumerEpoch);
13321334
message.setBrokerEntryMetadata(brokerEntryMetadata);
13331335
return message;
13341336
} catch (IOException | IllegalStateException e) {
@@ -1347,8 +1349,21 @@ protected <V> MessageImpl<V> newMessage(final MessageIdImpl messageId,
13471349
final Schema<V> schema,
13481350
final int redeliveryCount,
13491351
final long consumerEpoch) {
1352+
return newMessage(messageId, brokerEntryMetadata, messageMetadata, payload, schema, redeliveryCount,
1353+
consumerEpoch, false);
1354+
}
1355+
1356+
protected <V> MessageImpl<V> newMessage(final MessageIdImpl messageId,
1357+
final BrokerEntryMetadata brokerEntryMetadata,
1358+
final MessageMetadata messageMetadata,
1359+
final ByteBuf payload,
1360+
final Schema<V> schema,
1361+
final int redeliveryCount,
1362+
final long consumerEpoch,
1363+
final boolean isEncrypted) {
13501364
final MessageImpl<V> message = MessageImpl.create(topicName.toString(), messageId, messageMetadata, payload,
1351-
createEncryptionContext(messageMetadata), cnx(), schema, redeliveryCount, poolMessages, consumerEpoch);
1365+
createEncryptionContext(messageMetadata, isEncrypted), cnx(), schema, redeliveryCount,
1366+
poolMessages, consumerEpoch);
13521367
message.setBrokerEntryMetadata(brokerEntryMetadata);
13531368
return message;
13541369
}
@@ -1532,7 +1547,7 @@ void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, Clien
15321547

15331548
final MessageImpl<T> message =
15341549
newMessage(msgId, brokerEntryMetadata, msgMetadata, uncompressedPayload,
1535-
schema, redeliveryCount, consumerEpoch);
1550+
schema, redeliveryCount, consumerEpoch, isMessageUndecryptable);
15361551
uncompressedPayload.release();
15371552

15381553
if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null) {
@@ -1552,7 +1567,7 @@ void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, Clien
15521567
} else {
15531568
// handle batch message enqueuing; uncompressed payload has all messages in batch
15541569
receiveIndividualMessagesFromBatch(brokerEntryMetadata, msgMetadata, redeliveryCount, ackSet,
1555-
uncompressedPayload, messageId, cnx, consumerEpoch);
1570+
uncompressedPayload, messageId, cnx, consumerEpoch, isMessageUndecryptable);
15561571

15571572
uncompressedPayload.release();
15581573
}
@@ -1752,7 +1767,8 @@ private void interceptAndComplete(final Message<T> message, final CompletableFut
17521767

17531768
void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata msgMetadata,
17541769
int redeliveryCount, List<Long> ackSet, ByteBuf uncompressedPayload,
1755-
MessageIdData messageId, ClientCnx cnx, long consumerEpoch) {
1770+
MessageIdData messageId, ClientCnx cnx, long consumerEpoch,
1771+
boolean isEncrypted) {
17561772
int batchSize = msgMetadata.getNumMessagesInBatch();
17571773

17581774
// create ack tracker for entry aka batch
@@ -1775,7 +1791,7 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata,
17751791
for (int i = 0; i < batchSize; ++i) {
17761792
final MessageImpl<T> message = newSingleMessage(i, batchSize, brokerEntryMetadata, msgMetadata,
17771793
singleMessageMetadata, uncompressedPayload, batchMessage, schema, true,
1778-
ackBitSet, ackSetInMessageId, redeliveryCount, consumerEpoch);
1794+
ackBitSet, ackSetInMessageId, redeliveryCount, consumerEpoch, isEncrypted);
17791795
if (message == null) {
17801796
// If it is not in ackBitSet, it means Broker does not want to deliver it to the client, and
17811797
// did not decrease the permits in the broker-side.
@@ -2905,9 +2921,11 @@ private boolean isMessageUndecryptable(MessageMetadata msgMetadata) {
29052921
* Create EncryptionContext if message payload is encrypted.
29062922
*
29072923
* @param msgMetadata
2924+
* @param isEncrypted
29082925
* @return {@link Optional}<{@link EncryptionContext}>
29092926
*/
2910-
private Optional<EncryptionContext> createEncryptionContext(MessageMetadata msgMetadata) {
2927+
private Optional<EncryptionContext> createEncryptionContext(MessageMetadata msgMetadata,
2928+
boolean isEncrypted) {
29112929

29122930
EncryptionContext encryptionCtx = null;
29132931
if (msgMetadata.getEncryptionKeysCount() > 0) {
@@ -2930,6 +2948,7 @@ private Optional<EncryptionContext> createEncryptionContext(MessageMetadata msgM
29302948
.setCompressionType(CompressionCodecProvider.convertFromWireProtocol(msgMetadata.getCompression()));
29312949
encryptionCtx.setUncompressedMessageSize(msgMetadata.getUncompressedSize());
29322950
encryptionCtx.setBatchSize(batchSize);
2951+
encryptionCtx.setEncrypted(isEncrypted);
29332952
}
29342953
return Optional.ofNullable(encryptionCtx);
29352954
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ public <T> Message<T> getMessageAt(int index,
137137
ackBitSet,
138138
ackSetInMessageId,
139139
redeliveryCount,
140-
consumerEpoch);
140+
consumerEpoch,
141+
false);
141142
} finally {
142143
payloadBuffer.release();
143144
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,8 @@ protected void tryTriggerListener() {
194194
@Override
195195
void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata msgMetadata,
196196
int redeliveryCount, List<Long> ackSet, ByteBuf uncompressedPayload,
197-
MessageIdData messageId, ClientCnx cnx, long consumerEpoch) {
197+
MessageIdData messageId, ClientCnx cnx, long consumerEpoch,
198+
boolean isEncrypted) {
198199
log.warn(
199200
"Closing consumer [{}]-[{}] due to unsupported received batch-message with zero receiver queue size",
200201
subscription, consumerName);

0 commit comments

Comments
 (0)