Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/pulsar/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ class PULSAR_PUBLIC Message {
*/
const std::string& getProducerName() const noexcept;

/**
* Get the source cluster from which the message was replicated.
*
* @return the optional pointer to the source cluster name if the message was replicated, the pointer is
* valid as the Message instance is alive
*/
std::optional<const std::string*> getReplicatedFrom() const;
Comment thread
BewareMyPower marked this conversation as resolved.
Comment thread
BewareMyPower marked this conversation as resolved.

/**
* @return the optional encryption context that is present when the message is encrypted, the pointer is
* valid as the Message instance is alive
Expand Down
9 changes: 9 additions & 0 deletions include/pulsar/c/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,15 @@ PULSAR_PUBLIC void pulsar_message_set_schema_version(pulsar_message_t *message,
*/
PULSAR_PUBLIC const char *pulsar_message_get_producer_name(pulsar_message_t *message);

/**
* Get the source cluster from which the message was replicated.
*
* The pointer points to internal storage owned by the message wrapper, so the caller should not free it.
*
* @return the source cluster name, or NULL if the message is not replicated
*/
PULSAR_PUBLIC const char *pulsar_message_get_replicated_from(pulsar_message_t *message);

/**
* Check if the message has a null value.
*
Expand Down
7 changes: 7 additions & 0 deletions lib/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@ const std::string& Message::getProducerName() const noexcept {
return impl_->metadata.producer_name();
}

std::optional<const std::string*> Message::getReplicatedFrom() const {
if (!impl_ || !impl_->metadata.has_replicated_from()) {
return std::nullopt;
}
return &impl_->metadata.replicated_from();
}
Comment thread
BewareMyPower marked this conversation as resolved.
Comment thread
BewareMyPower marked this conversation as resolved.

std::optional<const EncryptionContext*> Message::getEncryptionContext() const {
if (!impl_ || !impl_->encryptionContext_.has_value()) {
return std::nullopt;
Expand Down
9 changes: 9 additions & 0 deletions lib/c/c_Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,13 @@ const char *pulsar_message_get_producer_name(pulsar_message_t *message) {
return message->message.getProducerName().c_str();
}

const char *pulsar_message_get_replicated_from(pulsar_message_t *message) {
const auto replicatedFrom = message->message.getReplicatedFrom();
if (!replicatedFrom) {
return NULL;
}

return message->message.getReplicatedFrom().value()->c_str();
}
Comment thread
BewareMyPower marked this conversation as resolved.
Comment thread
BewareMyPower marked this conversation as resolved.

int pulsar_message_has_null_value(pulsar_message_t *message) { return message->message.hasNullValue(); }
12 changes: 12 additions & 0 deletions tests/MessageTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <string>

#include "PulsarFriend.h"
#include "lib/MessageImpl.h"

using namespace pulsar;
Expand Down Expand Up @@ -154,6 +155,17 @@ TEST(MessageTest, testGetTopicNameOnProducerMessage) {
ASSERT_TRUE(msg.getTopicName().empty());
}

TEST(MessageTest, testReplicationMetadataAccessors) {
auto msg = MessageBuilder().setContent("test").build();
ASSERT_FALSE(msg.getReplicatedFrom().has_value());

PulsarFriend::getMessageMetadata(msg).set_replicated_from("us-west1");

const auto replicatedFrom = msg.getReplicatedFrom();
ASSERT_TRUE(replicatedFrom.has_value());
ASSERT_EQ(*replicatedFrom.value(), "us-west1");
}

TEST(MessageTest, testNullValueMessage) {
{
auto msg = MessageBuilder().setContent("test").build();
Expand Down
16 changes: 16 additions & 0 deletions tests/c/c_MessageTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <lib/c/c_structs.h>
#include <pulsar/c/message.h>

#include "../PulsarFriend.h"

TEST(c_MessageTest, MessageCopy) {
pulsar_message_t *from = pulsar_message_create();
pulsar_message_set_content(from, "hello", 5);
Expand All @@ -32,3 +34,17 @@ TEST(c_MessageTest, MessageCopy) {
pulsar_message_free(from);
pulsar_message_free(to);
}

TEST(c_MessageTest, ReplicationMetadataAccessors) {
pulsar_message_t *message = pulsar_message_create();
pulsar_message_set_content(message, "hello", 5);
message->message = message->builder.build();

ASSERT_EQ(nullptr, pulsar_message_get_replicated_from(message));

PulsarFriend::getMessageMetadata(message->message).set_replicated_from("us-west1");

ASSERT_STREQ("us-west1", pulsar_message_get_replicated_from(message));

pulsar_message_free(message);
}
Loading