MINIFICPP-2769 Move Kafka Extension to stable C API#2175
Conversation
| namespace org::apache::nifi::minifi::processors { | ||
| // The upper limit for Max Poll Time is 4 seconds. This is because Watchdog would potentially start | ||
| // reporting issues with the processor health otherwise | ||
| bool consume_kafka::ConsumeKafkaMaxPollTimePropertyValidator::validate(const std::string_view input) const { |
There was a problem hiding this comment.
Removed this custom validator (because C Api only supports standard validators ATM), and added a custom validation into onSchedule
| } | ||
|
|
||
| namespace { | ||
| void rebalance_cb(rd_kafka_t* rk, rd_kafka_resp_err_t trigger, rd_kafka_topic_partition_list_t* partitions, void* /*opaque*/) { |
There was a problem hiding this comment.
I've moved this rebalence_cb into a helper class thats available through an opaque handle (needed for logging which is not available anymore through statis functions)
https://github.com/apache/nifi-minifi-cpp/pull/2175/changes#diff-93908b3b2326640601fa590e78c35c046654a1ac90c6bf70dcd7dd1bcf0d2c5cR73
| setKafkaConfigurationField(*conf_, "isolation.level", utils::parseBoolProperty(context, HonorTransactions) ? "read_committed" : "read_uncommitted"); | ||
| setKafkaConfigurationField(*conf_, "group.id", utils::parseProperty(context, GroupID)); | ||
| setKafkaConfigurationField(*conf_, "client.id", this->getUUIDStr()); | ||
| // setKafkaConfigurationField(*conf_, "client.id", client_id); No need to set id since its autogenerated, and we don't access it anywhere from minifi |
There was a problem hiding this comment.
We dont have access to the processor's UUID, but this seems unnessary anyways, any other ideas are welcome :D
| topics_[topicName] = topic; | ||
| } | ||
|
|
||
| void KafkaConnection::logCallback(const rd_kafka_t* rk, const int level, const char* /*fac*/, const char* buf) { |
There was a problem hiding this comment.
This is also been moved into KafkaOpaque class https://github.com/apache/nifi-minifi-cpp/pull/2175/changes#diff-93908b3b2326640601fa590e78c35c046654a1ac90c6bf70dcd7dd1bcf0d2c5cR46
There was a problem hiding this comment.
Pull request overview
This PR migrates the Kafka extension (and some related extension infrastructure) toward the stable C API / cpp-extension-lib surface by refactoring processor implementations to api::core::* interfaces, switching extension registration to the C-API initializer path, and adjusting supporting utilities/tests accordingly.
Changes:
- Rename/process-context SSL data C API entry point and wire it through the CFFI
ProcessContextwrapper. - Refactor Kafka processors (PublishKafka/ConsumeKafka) to
api::core::{ProcessContext,ProcessSession}and introduceKafkaOpaquefor librdkafka callbacks/logging. - Update extension initializers/CMake targets (Kafka + LlamaCpp) and modernize Kafka tests to use the mock C-API framework.
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| minifi-api/minifi-c-api.def | Updates exported C API symbol list for SSL data retrieval. |
| minifi-api/include/minifi-c/minifi-c.h | Renames the SSL data retrieval C API declaration. |
| libminifi/src/minifi-c.cpp | Implements renamed SSL data retrieval function by resolving controller service name from a property. |
| extensions/llamacpp/processors/RunLlamaCppInference.cpp | Switches to parseOptionalProperty helper for optional property parsing. |
| extensions/llamacpp/ExtensionInitializer.cpp | Updates include path and registers processors via C API extension initializer helper. |
| extensions/llamacpp/CMakeLists.txt | Ensures ExtensionInitializer.cpp is part of the build sources. |
| extensions/kafka/tests/PublishKafkaTests.cpp | Replaces legacy test harness with mock C-API process context/session based tests. |
| extensions/kafka/tests/CMakeLists.txt | Updates test linking/runtime output settings for the new test approach. |
| extensions/kafka/rdkafka_utils.h | Refactors utilities and introduces KafkaOpaque for callbacks; updates includes/types. |
| extensions/kafka/rdkafka_utils.cpp | Implements KafkaOpaque log + rebalance callbacks and topic list debug printing. |
| extensions/kafka/PublishKafka.h | Migrates processor interface to api::core and updates SSL/type handling. |
| extensions/kafka/PublishKafka.cpp | Migrates scheduling/triggering/session interactions to api::core and updates config callbacks. |
| extensions/kafka/KafkaProcessorBase.h | Migrates base class to api::core::ProcessorImpl and updates SSL property allowed type. |
| extensions/kafka/KafkaProcessorBase.cpp | Reworks SSL retrieval and authentication parameter setup for the new API surface. |
| extensions/kafka/KafkaConnection.h | Removes legacy logger plumbing and static log callback approach. |
| extensions/kafka/KafkaConnection.cpp | Removes legacy logger mapping/callback implementation. |
| extensions/kafka/ExtensionInitializer.cpp | Adds C API extension initializer registering Kafka processors. |
| extensions/kafka/ConsumeKafka.h | Migrates processor interface to api::core, updates validator usage, refactors helper signatures. |
| extensions/kafka/ConsumeKafka.cpp | Migrates scheduling/triggering/session interactions to api::core and updates callbacks/config handling. |
| extensions/kafka/CMakeLists.txt | Renames the extension library target and registers it as a C-API extension. |
| extension-framework/cpp-extension-lib/src/core/ProcessContext.cpp | Routes SSL retrieval through the renamed C API function. |
| extension-framework/cpp-extension-lib/mocklib/src/mock-minifi-c.cpp | Updates mocked symbol name and wraps exports in extern "C". |
| extension-framework/cpp-extension-lib/mocklib/CMakeLists.txt | Renames mock library target to mock-minifi. |
| core-framework/common/include/core/PropertyDefinitionBuilder.h | Adds string-literal-based withAllowedType() support for stable allowed-type storage. |
| CMakeLists.txt | Updates default-enabled extension name from the old Kafka target to the new one. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
2878fa2 to
527195a
Compare
97edf58 to
e4f700e
Compare
527195a to
cf8d253
Compare
e4f700e to
c9060a8
Compare
208447e to
1985dd0
Compare
4d18c0d to
0a3ae8b
Compare
1985dd0 to
19d57ec
Compare
Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically main)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.