Skip to content

Commit 3a1c685

Browse files
committed
review changes
1 parent 71f4c95 commit 3a1c685

4 files changed

Lines changed: 8 additions & 8 deletions

File tree

extensions/kafka/ConsumeKafka.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ void ConsumeKafka::configureNewConnection(api::core::ProcessContext& context) {
114114
// Registering a rebalance_cb turns off librdkafka's automatic partition assignment/revocation and instead delegates that
115115
// responsibility to the application's rebalance_cb.
116116
if (commit_policy_ != consume_kafka::CommitPolicyEnum::CommitFromIncomingFlowFiles) {
117-
rd_kafka_conf_set_rebalance_cb(conf_.get(), utils::KafkaOpaque::rebalance_cb);
117+
rd_kafka_conf_set_rebalance_cb(conf_.get(), utils::KafkaOpaque::rebalanceCallback);
118118
}
119119

120120
setKafkaAuthenticationParameters(context, gsl::make_not_null(conf_.get()));

extensions/kafka/PublishKafka.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -595,7 +595,7 @@ bool PublishKafka::createNewTopic(const api::core::ProcessContext& context, cons
595595
}
596596

597597
std::optional<api::utils::net::SslData> PublishKafka::getSslData(api::core::ProcessContext& context) const {
598-
if (auto result = KafkaProcessorBase::getSslData(context); result) { return result; }
598+
if (auto result = context.getSslData(SSLContextService); result) { return *result; }
599599

600600
api::utils::net::SslData ssl_data;
601601
if (auto security_ca = context.getProperty(SecurityCA, nullptr)) { ssl_data.ca_loc = *security_ca; }

extensions/kafka/rdkafka_utils.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ void setKafkaConfigurationField(rd_kafka_conf_t& configuration, const std::strin
3636
}
3737
}
3838

39-
void KafkaOpaque::print_topics_list(const rd_kafka_topic_partition_list_t& kf_topic_partition_list) const {
39+
void KafkaOpaque::printTopicsList(const rd_kafka_topic_partition_list_t& kf_topic_partition_list) const {
4040
if (!logger_.should_log(core::logging::debug))
4141
return;
4242
for (int i = 0; i < kf_topic_partition_list.cnt; ++i) {
@@ -71,7 +71,7 @@ void KafkaOpaque::logCallback(const rd_kafka_t* rk, const int level, const char*
7171
default: gsl_FailFast();
7272
}
7373
}
74-
void KafkaOpaque::rebalance_cb(rd_kafka_t* rk, const rd_kafka_resp_err_t trigger, rd_kafka_topic_partition_list_t* partitions, void* opaque_ptr) {
74+
void KafkaOpaque::rebalanceCallback(rd_kafka_t* rk, const rd_kafka_resp_err_t trigger, rd_kafka_topic_partition_list_t* partitions, void* opaque_ptr) {
7575
const auto* kafka_opaque = static_cast<KafkaOpaque*>(opaque_ptr);
7676
if (!kafka_opaque) {
7777
return;
@@ -81,14 +81,14 @@ void KafkaOpaque::rebalance_cb(rd_kafka_t* rk, const rd_kafka_resp_err_t trigger
8181
switch (trigger) {
8282
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
8383
kafka_opaque->logger_.log_debug("assigned:");
84-
kafka_opaque->print_topics_list(*partitions);
84+
kafka_opaque->printTopicsList(*partitions);
8585
assign_error = rd_kafka_assign(rk, partitions);
8686
break;
8787

8888
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
8989
kafka_opaque->logger_.log_debug("revoked:");
9090
rd_kafka_commit(rk, partitions, /* async = */ 0); // Sync commit, maybe unnecessary
91-
kafka_opaque->print_topics_list(*partitions);
91+
kafka_opaque->printTopicsList(*partitions);
9292

9393
assign_error = rd_kafka_assign(rk, nullptr);
9494
break;

extensions/kafka/rdkafka_utils.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,9 @@ class KafkaOpaque {
100100
public:
101101
explicit KafkaOpaque(core::logging::Logger& logger) : logger_(logger) {}
102102

103-
void print_topics_list(const rd_kafka_topic_partition_list_t& kf_topic_partition_list) const;
103+
void printTopicsList(const rd_kafka_topic_partition_list_t& kf_topic_partition_list) const;
104104
static void logCallback(const rd_kafka_t* rk, int level, const char* /*fac*/, const char* buf);
105-
static void rebalance_cb(rd_kafka_t* rk, rd_kafka_resp_err_t trigger, rd_kafka_topic_partition_list_t* partitions, void* opaque_ptr);
105+
static void rebalanceCallback(rd_kafka_t* rk, rd_kafka_resp_err_t trigger, rd_kafka_topic_partition_list_t* partitions, void* opaque_ptr);
106106

107107
private:
108108
core::logging::Logger& logger_;

0 commit comments

Comments
 (0)