Skip to content

Commit 2c4656f

Browse files
Support for tombstone null message
1 parent 03fe428 commit 2c4656f

9 files changed

Lines changed: 141 additions & 0 deletions

File tree

include/pulsar/Message.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,17 @@ class PULSAR_PUBLIC Message {
159159
*/
160160
bool hasOrderingKey() const;
161161

162+
/**
163+
* Check if the message has a null value.
164+
*
165+
* Messages with null values are used as tombstones on compacted topics
166+
* to delete the message for a specific key.
167+
*
168+
* @return true if the message has a null value (tombstone)
169+
* false if the message has actual payload data
170+
*/
171+
bool hasNullValue() const;
172+
162173
/**
163174
* Get the UTC based timestamp in milliseconds referring to when the message was published by the client
164175
* producer

include/pulsar/MessageBuilder.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,17 @@ class PULSAR_PUBLIC MessageBuilder {
156156
*/
157157
MessageBuilder& disableReplication(bool flag);
158158

159+
/**
160+
* Mark the message as having a null value.
161+
*
162+
* This is used for messages on compacted topics where a null value
163+
* acts as a tombstone for a specific key, removing the message from
164+
* the compacted view.
165+
*
166+
* @return the message builder instance
167+
*/
168+
MessageBuilder& setNullValue();
169+
159170
/**
160171
* create a empty message, with no properties or data
161172
*

include/pulsar/c/message.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,15 @@ PULSAR_PUBLIC void pulsar_message_set_replication_clusters(pulsar_message_t *mes
127127
*/
128128
PULSAR_PUBLIC void pulsar_message_disable_replication(pulsar_message_t *message, int flag);
129129

130+
/**
131+
* Mark the message as having a null value.
132+
*
133+
* This is used for messages on compacted topics where a null value
134+
* acts as a tombstone for a specific key, removing the message from
135+
* the compacted view.
136+
*/
137+
PULSAR_PUBLIC void pulsar_message_set_null_value(pulsar_message_t *message);
138+
130139
/// Accessor for built messages
131140

132141
/**
@@ -221,6 +230,16 @@ PULSAR_PUBLIC void pulsar_message_set_schema_version(pulsar_message_t *message,
221230
*/
222231
PULSAR_PUBLIC const char *pulsar_message_get_producer_name(pulsar_message_t *message);
223232

233+
/**
234+
* Check if the message has a null value.
235+
*
236+
* Messages with null values are used as tombstones on compacted topics
237+
* to delete the message for a specific key.
238+
*
239+
* @return 1 if the message has a null value, 0 otherwise
240+
*/
241+
PULSAR_PUBLIC int pulsar_message_has_null_value(pulsar_message_t *message);
242+
224243
#ifdef __cplusplus
225244
}
226245
#endif

lib/Commands.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -871,6 +871,10 @@ static std::pair<std::unique_ptr<char[]>, size_t> serializeSingleMessageMetadata
871871
metadata.set_sequence_id(msgMetadata.sequence_id());
872872
}
873873

874+
if (msgMetadata.null_value()) {
875+
metadata.set_null_value(true);
876+
}
877+
874878
size_t size = metadata.ByteSizeLong();
875879
std::unique_ptr<char[]> data{new char[size]};
876880
metadata.SerializeToArray(data.get(), size);

lib/Message.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,12 @@ Message::Message(const MessageId& messageID, proto::BrokerEntryMetadata& brokerE
123123
} else {
124124
impl_->metadata.clear_sequence_id();
125125
}
126+
127+
if (singleMetadata.null_value()) {
128+
impl_->metadata.set_null_value(true);
129+
} else {
130+
impl_->metadata.clear_null_value();
131+
}
126132
}
127133

128134
const MessageId& Message::getMessageId() const {
@@ -177,6 +183,13 @@ const std::string& Message::getOrderingKey() const {
177183
return impl_->getOrderingKey();
178184
}
179185

186+
bool Message::hasNullValue() const {
187+
if (impl_) {
188+
return impl_->metadata.null_value();
189+
}
190+
return false;
191+
}
192+
180193
const std::string& Message::getTopicName() const {
181194
if (!impl_) {
182195
return emptyString;

lib/MessageBuilder.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,12 @@ MessageBuilder& MessageBuilder::disableReplication(bool flag) {
157157
return *this;
158158
}
159159

160+
MessageBuilder& MessageBuilder::setNullValue() {
161+
checkMetadata();
162+
impl_->metadata.set_null_value(true);
163+
return *this;
164+
}
165+
160166
const char* MessageBuilder::data() const {
161167
assert(impl_->payload.data());
162168
return impl_->payload.data();

lib/c/c_Message.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ void pulsar_message_disable_replication(pulsar_message_t *message, int flag) {
8181
message->builder.disableReplication(flag);
8282
}
8383

84+
void pulsar_message_set_null_value(pulsar_message_t *message) { message->builder.setNullValue(); }
85+
8486
int pulsar_message_has_property(pulsar_message_t *message, const char *name) {
8587
return message->message.hasProperty(name);
8688
}
@@ -148,3 +150,5 @@ void pulsar_message_set_schema_version(pulsar_message_t *message, const char *sc
148150
const char *pulsar_message_get_producer_name(pulsar_message_t *message) {
149151
return message->message.getProducerName().c_str();
150152
}
153+
154+
int pulsar_message_has_null_value(pulsar_message_t *message) { return message->message.hasNullValue(); }

tests/BatchMessageTest.cc

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -988,6 +988,36 @@ TEST(BatchMessageTest, testParseMessageBatchEntry) {
988988
}
989989
}
990990

991+
TEST(BatchMessageTest, testParseMessageBatchEntryWithNullValue) {
992+
std::vector<Message> msgs;
993+
msgs.emplace_back(MessageBuilder().setPartitionKey("key1").setNullValue().build());
994+
msgs.emplace_back(MessageBuilder().setContent("content2").setPartitionKey("key2").build());
995+
msgs.emplace_back(MessageBuilder().setPartitionKey("key3").setNullValue().build());
996+
997+
SharedBuffer payload;
998+
Commands::serializeSingleMessagesToBatchPayload(payload, msgs);
999+
ASSERT_EQ(payload.writableBytes(), 0);
1000+
1001+
MessageBatch messageBatch;
1002+
auto fakeId = MessageIdBuilder().ledgerId(6000L).entryId(20L).partition(0).build();
1003+
messageBatch.withMessageId(fakeId).parseFrom(payload, static_cast<uint32_t>(msgs.size()));
1004+
const std::vector<Message>& messages = messageBatch.messages();
1005+
1006+
ASSERT_EQ(messages.size(), 3);
1007+
1008+
ASSERT_TRUE(messages[0].hasNullValue());
1009+
ASSERT_EQ(messages[0].getPartitionKey(), "key1");
1010+
ASSERT_EQ(messages[0].getLength(), 0);
1011+
1012+
ASSERT_FALSE(messages[1].hasNullValue());
1013+
ASSERT_EQ(messages[1].getPartitionKey(), "key2");
1014+
ASSERT_EQ(messages[1].getDataAsString(), "content2");
1015+
1016+
ASSERT_TRUE(messages[2].hasNullValue());
1017+
ASSERT_EQ(messages[2].getPartitionKey(), "key3");
1018+
ASSERT_EQ(messages[2].getLength(), 0);
1019+
}
1020+
9911021
TEST(BatchMessageTest, testSendCallback) {
9921022
const std::string topicName = "persistent://public/default/BasicMessageTest-testSendCallback";
9931023

tests/MessageTest.cc

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,3 +153,46 @@ TEST(MessageTest, testGetTopicNameOnProducerMessage) {
153153
auto msg = MessageBuilder().setContent("test").build();
154154
ASSERT_TRUE(msg.getTopicName().empty());
155155
}
156+
157+
TEST(MessageTest, testNullValueMessage) {
158+
{
159+
auto msg = MessageBuilder().setContent("test").build();
160+
ASSERT_FALSE(msg.hasNullValue());
161+
}
162+
163+
{
164+
auto msg = MessageBuilder().setNullValue().setPartitionKey("key1").build();
165+
ASSERT_TRUE(msg.hasNullValue());
166+
ASSERT_EQ(msg.getLength(), 0);
167+
ASSERT_EQ(msg.getPartitionKey(), "key1");
168+
}
169+
170+
{
171+
auto msg = MessageBuilder().setPartitionKey("key2").setNullValue().build();
172+
ASSERT_TRUE(msg.hasNullValue());
173+
ASSERT_EQ(msg.getPartitionKey(), "key2");
174+
}
175+
}
176+
177+
TEST(MessageTest, testEmptyMessage) {
178+
auto msg = MessageBuilder().build();
179+
ASSERT_FALSE(msg.hasNullValue());
180+
ASSERT_EQ(msg.getLength(), 0);
181+
}
182+
183+
TEST(MessageTest, testEmptyStringNotNullValue) {
184+
// Empty string message - has content set to ""
185+
auto emptyStringMsg = MessageBuilder().setContent("").build();
186+
ASSERT_FALSE(emptyStringMsg.hasNullValue());
187+
ASSERT_EQ(emptyStringMsg.getLength(), 0);
188+
ASSERT_EQ(emptyStringMsg.getDataAsString(), "");
189+
190+
// Null value message - explicitly marked as null
191+
auto nullValueMsg = MessageBuilder().setNullValue().setPartitionKey("key").build();
192+
ASSERT_TRUE(nullValueMsg.hasNullValue());
193+
ASSERT_EQ(nullValueMsg.getLength(), 0);
194+
195+
// Both have length 0, but they are semantically different
196+
// Empty string: the value IS an empty string
197+
// Null value: the value does not exist (tombstone for compaction)
198+
}

0 commit comments

Comments
 (0)