Skip to content

Commit 6ba48bc

Browse files
committed
feat: add replicateSubscriptionState to ConsumerConfig
Add support for replicateSubscriptionState option in ConsumerConfig, which enables geo-replication failover by synchronizing subscription cursor state across clusters. Fixes #478
1 parent 3f539b8 commit 6ba48bc

2 files changed

Lines changed: 9 additions & 0 deletions

File tree

index.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ export interface ConsumerConfig {
110110
deadLetterPolicy?: DeadLetterPolicy;
111111
batchReceivePolicy?: ConsumerBatchReceivePolicy;
112112
keySharedPolicy?: KeySharedPolicy;
113+
replicateSubscriptionState?: boolean;
113114
}
114115

115116
export class Consumer {

src/ConsumerConfig.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ static const std::string CFG_KEY_SHARED_POLICY_MODE = "keyShareMode";
6262
static const std::string CFG_KEY_SHARED_POLICY_ALLOW_OUT_OF_ORDER = "allowOutOfOrderDelivery";
6363
static const std::string CFG_KEY_SHARED_POLICY_STICKY_RANGES = "stickyRanges";
6464
static const std::string CFG_CRYPTO_KEY_READER = "cryptoKeyReader";
65+
static const std::string CFG_REPLICATE_SUBSCRIPTION_STATE = "replicateSubscriptionState";
6566

6667
static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
6768
{"Exclusive", pulsar_ConsumerExclusive},
@@ -400,6 +401,13 @@ void ConsumerConfig::InitConfig(std::shared_ptr<ThreadSafeDeferred> deferred,
400401
}
401402
this->cConsumerConfig.get()->consumerConfiguration.setKeySharedPolicy(cppKeySharedPolicy);
402403
}
404+
405+
if (consumerConfig.Has(CFG_REPLICATE_SUBSCRIPTION_STATE) &&
406+
consumerConfig.Get(CFG_REPLICATE_SUBSCRIPTION_STATE).IsBoolean()) {
407+
bool replicateSubscriptionState = consumerConfig.Get(CFG_REPLICATE_SUBSCRIPTION_STATE).ToBoolean();
408+
this->cConsumerConfig.get()->consumerConfiguration.setReplicateSubscriptionStateEnabled(
409+
replicateSubscriptionState);
410+
}
403411
}
404412

405413
ConsumerConfig::~ConsumerConfig() {

0 commit comments

Comments
 (0)