Skip to content

Commit bab8420

Browse files
committed
MINIFICPP-2797 Extend C Api with sslcontext, penalize, dynamic properties, flowfile id and size #2166
1 parent d9910d0 commit bab8420

4 files changed

Lines changed: 102 additions & 16 deletions

File tree

extension-framework/cpp-extension-lib/src/core/ProcessSession.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,29 +118,29 @@ void ProcessSession::read(FlowFile& flow_file, const io::InputStreamCallback& ca
118118

119119
void ProcessSession::setAttribute(FlowFile& ff, const std::string_view key, std::string value) { // NOLINT(performance-unnecessary-value-param)
120120
const MinifiStringView value_ref = utils::toStringView(value);
121-
if (MINIFI_STATUS_SUCCESS != MinifiFlowFileSetAttribute(impl_, ff.get(), utils::toStringView(key), &value_ref)) {
121+
if (MINIFI_STATUS_SUCCESS != MinifiProcessSessionSetFlowFileAttribute(impl_, ff.get(), utils::toStringView(key), &value_ref)) {
122122
throw minifi::Exception(minifi::FILE_OPERATION_EXCEPTION, "Failed to set attribute");
123123
}
124124
}
125125

126126
void ProcessSession::removeAttribute(FlowFile& ff, const std::string_view key) {
127-
if (MINIFI_STATUS_SUCCESS != MinifiFlowFileSetAttribute(impl_, ff.get(), utils::toStringView(key), nullptr)) {
127+
if (MINIFI_STATUS_SUCCESS != MinifiProcessSessionSetFlowFileAttribute(impl_, ff.get(), utils::toStringView(key), nullptr)) {
128128
throw minifi::Exception(minifi::FILE_OPERATION_EXCEPTION, "Failed to remove attribute");
129129
}
130130
}
131131

132132
std::optional<std::string> ProcessSession::getAttribute(FlowFile& ff, std::string_view key) {
133133
std::optional<std::string> result;
134-
MinifiFlowFileGetAttribute(impl_, ff.get(), utils::toStringView(key), [] (void* user_ctx, MinifiStringView value) {
134+
MinifiProcessSessionGetFlowFileAttribute(impl_, ff.get(), utils::toStringView(key), [] (void* user_ctx, MinifiStringView value) {
135135
*static_cast<std::optional<std::string>*>(user_ctx) = std::string{value.data, value.length};
136136
}, &result);
137137
return result;
138138
}
139139

140140
std::map<std::string, std::string> ProcessSession::getAttributes(FlowFile& ff) {
141141
std::map<std::string, std::string> result;
142-
MinifiFlowFileGetAttributes(impl_, ff.get(), [] (void* user_ctx, MinifiStringView value, MinifiStringView key) {
143-
static_cast<std::map<std::string, std::string>*>(user_ctx)->insert({std::string{value.data, value.length}, std::string{key.data, key.length}});
142+
MinifiProcessSessionGetFlowFileAttributes(impl_, ff.get(), [] (void* user_ctx, const MinifiStringView key, const MinifiStringView value) {
143+
static_cast<std::map<std::string, std::string>*>(user_ctx)->insert({std::string{key.data, key.length}, std::string{value.data, value.length}});
144144
}, &result);
145145
return result;
146146
}

libminifi/src/minifi-c.cpp

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "core/ProcessorMetrics.h"
2525
#include "core/extension/ExtensionManager.h"
2626
#include "minifi-cpp/Exception.h"
27+
#include "minifi-cpp/controllers/SSLContextServiceInterface.h"
2728
#include "minifi-cpp/core/Annotation.h"
2829
#include "minifi-cpp/core/ClassLoader.h"
2930
#include "minifi-cpp/core/ProcessContext.h"
@@ -35,8 +36,8 @@
3536
#include "minifi-cpp/core/PropertyValidator.h"
3637
#include "minifi-cpp/core/logging/Logger.h"
3738
#include "minifi-cpp/core/state/PublishedMetricProvider.h"
38-
#include "utils/CProcessor.h"
3939
#include "utils/CControllerService.h"
40+
#include "utils/CProcessor.h"
4041
#include "utils/PropertyErrors.h"
4142

4243
namespace minifi = org::apache::nifi::minifi;
@@ -435,6 +436,18 @@ MINIFI_OWNED MinifiFlowFile* MinifiProcessSessionCreate(MinifiProcessSession* se
435436
return MINIFI_NULL;
436437
}
437438

439+
MinifiStatus MinifiProcessSessionPenalize(MinifiProcessSession* session, MinifiFlowFile* flowfile) {
440+
gsl_Assert(session != MINIFI_NULL);
441+
gsl_Assert(flowfile != MINIFI_NULL);
442+
try {
443+
reinterpret_cast<minifi::core::ProcessSession*>(session)->penalize(
444+
*reinterpret_cast<std::shared_ptr<minifi::core::FlowFile>*>(flowfile));
445+
return MINIFI_STATUS_SUCCESS;
446+
} catch (...) {
447+
return MINIFI_STATUS_UNKNOWN_ERROR;
448+
}
449+
}
450+
438451
MinifiStatus MinifiProcessSessionTransfer(MinifiProcessSession* session, MINIFI_OWNED MinifiFlowFile* flowfile, MinifiStringView relationship_name) {
439452
gsl_Assert(session != MINIFI_NULL);
440453
gsl_Assert(flowfile != MINIFI_NULL);
@@ -501,7 +514,7 @@ int64_t MinifiOutputStreamWrite(MinifiOutputStream* stream, const char* data, si
501514
return gsl::narrow<int64_t>(reinterpret_cast<minifi::io::OutputStream*>(stream)->write(as_bytes(std::span(data, size))));
502515
}
503516

504-
MinifiStatus MinifiFlowFileSetAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name, const MinifiStringView* attribute_value) {
517+
MinifiStatus MinifiProcessSessionSetFlowFileAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name, const MinifiStringView* attribute_value) {
505518
gsl_Assert(session != MINIFI_NULL);
506519
gsl_Assert(flowfile != MINIFI_NULL);
507520
if (attribute_value == nullptr) {
@@ -513,7 +526,7 @@ MinifiStatus MinifiFlowFileSetAttribute(MinifiProcessSession* session, MinifiFlo
513526
return MINIFI_STATUS_SUCCESS;
514527
}
515528

516-
MinifiBool MinifiFlowFileGetAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name,
529+
MinifiBool MinifiProcessSessionGetFlowFileAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name,
517530
void(*cb)(void* user_ctx, MinifiStringView attribute_value), void* user_ctx) {
518531
gsl_Assert(session != MINIFI_NULL);
519532
gsl_Assert(flowfile != MINIFI_NULL);
@@ -525,7 +538,7 @@ MinifiBool MinifiFlowFileGetAttribute(MinifiProcessSession* session, MinifiFlowF
525538
return true;
526539
}
527540

528-
void MinifiFlowFileGetAttributes(MinifiProcessSession* session, MinifiFlowFile* flowfile,
541+
void MinifiProcessSessionGetFlowFileAttributes(MinifiProcessSession* session, MinifiFlowFile* flowfile,
529542
void(*cb)(void* user_ctx, MinifiStringView attribute_name, MinifiStringView attribute_value), void* user_ctx) {
530543
gsl_Assert(session != MINIFI_NULL);
531544
gsl_Assert(flowfile != MINIFI_NULL);
@@ -534,6 +547,20 @@ void MinifiFlowFileGetAttributes(MinifiProcessSession* session, MinifiFlowFile*
534547
}
535548
}
536549

550+
uint64_t MinifiProcessSessionGetFlowFileSize(MinifiProcessSession* session, MinifiFlowFile* flowfile) {
551+
gsl_Assert(session != MINIFI_NULL);
552+
gsl_Assert(flowfile != MINIFI_NULL);
553+
return (*reinterpret_cast<std::shared_ptr<minifi::core::FlowFile>*>(flowfile))->getSize();
554+
}
555+
556+
MinifiStatus MinifiProcessSessionGetFlowFileId(MinifiProcessSession* session, MinifiFlowFile* flowfile, void(*cb)(void* user_ctx, MinifiStringView flow_file_id), void* user_ctx) {
557+
gsl_Assert(session != MINIFI_NULL);
558+
gsl_Assert(flowfile != MINIFI_NULL);
559+
const auto uuid_small_str = (*reinterpret_cast<std::shared_ptr<minifi::core::FlowFile>*>(flowfile))->getUUIDStr();
560+
cb(user_ctx, minifiStringView(uuid_small_str.view()));
561+
return MINIFI_STATUS_SUCCESS;
562+
}
563+
537564
MinifiStatus MinifiControllerServiceContextGetProperty(MinifiControllerServiceContext* context, MinifiStringView property_name,
538565
void (*result_cb)(void* user_ctx, MinifiStringView result), void* user_ctx) {
539566
gsl_Assert(context != MINIFI_NULL);
@@ -551,7 +578,6 @@ MinifiStatus MinifiControllerServiceContextGetProperty(MinifiControllerServiceCo
551578
}
552579
}
553580

554-
555581
MinifiStatus MinifiProcessContextGetControllerService(
556582
MinifiProcessContext* process_context,
557583
const MinifiStringView controller_service_name,
@@ -578,5 +604,40 @@ MinifiStatus MinifiProcessContextGetControllerService(
578604
return MINIFI_STATUS_VALIDATION_FAILED;
579605
}
580606

607+
void MinifiProcessContextGetDynamicProperties(MinifiProcessContext* context, MinifiFlowFile* minifi_flow_file,
608+
void (*cb)(void* user_ctx, MinifiStringView dynamic_property_name, MinifiStringView dynamic_property_value), void* user_ctx) {
609+
gsl_Assert(context != MINIFI_NULL);
610+
auto flow_file = minifi_flow_file != MINIFI_NULL ? reinterpret_cast<std::shared_ptr<minifi::core::FlowFile>*>(minifi_flow_file)->get() : nullptr;
611+
for (auto& [key, value] : reinterpret_cast<minifi::core::ProcessContext*>(context)->getDynamicProperties(flow_file)) {
612+
cb(user_ctx, minifiStringView(key), minifiStringView(value));
613+
}
614+
}
615+
616+
MinifiStatus MinifiProcessContextGetSslData(MinifiProcessContext* process_context, MinifiStringView controller_service_name,
617+
void (*cb)(void* user_ctx, const MinifiSslData* ssl_data), void* user_ctx) {
618+
gsl_Assert(process_context != MINIFI_NULL);
619+
const auto context = reinterpret_cast<minifi::core::ProcessContext*>(process_context);
620+
const auto name_str = std::string{toStringView(controller_service_name)};
621+
const auto service_shared_ptr = context->getControllerService(name_str, context->getProcessorInfo().getUUID());
622+
if (!service_shared_ptr) { return MINIFI_STATUS_VALIDATION_FAILED; }
623+
if (const auto ssl_context_service = dynamic_cast<minifi::controllers::SSLContextServiceInterface*>(service_shared_ptr.get())) {
624+
const std::string ca_cert_file = ssl_context_service->getCACertificate().string();
625+
const std::string passphrase = ssl_context_service->getPassphrase();
626+
const std::string cert_file = ssl_context_service->getCertificateFile().string();
627+
const std::string private_key_file = ssl_context_service->getPrivateKeyFile().string();
628+
629+
MinifiSslData ssl_data{
630+
.version = 1,
631+
.ca_certificate_file = minifiStringView(ca_cert_file),
632+
.certificate_file = minifiStringView(cert_file),
633+
.private_key_file = minifiStringView(private_key_file),
634+
.passphrase = minifiStringView(passphrase),
635+
};
636+
cb(user_ctx, &ssl_data);
637+
return MINIFI_STATUS_SUCCESS;
638+
}
639+
return MINIFI_STATUS_VALIDATION_FAILED;
640+
}
641+
581642

582643
} // extern "C"

minifi-api/include/minifi-c/minifi-c.h

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ extern "C" {
4141
#define MINIFI_REGISTER_EXTENSION_FN MinifiRegisterExtension
4242
#endif
4343

44+
/// To allow the proper usage of SSLContextServices set the MinifiPropertyDefinition::type to MINIFI_SSL_CONTEXT_SERVICE_PROPERTY_TYPE
45+
#define MINIFI_SSL_CONTEXT_SERVICE_PROPERTY_TYPE "org.apache.nifi.minifi.controllers.SSLContextServiceInterface"
46+
4447
enum : uint32_t {
4548
MINIFI_API_VERSION = 3
4649
};
@@ -231,6 +234,8 @@ MinifiBool MinifiProcessContextHasNonEmptyProperty(MinifiProcessContext* context
231234

232235
MinifiStatus MinifiProcessContextGetControllerService(
233236
MinifiProcessContext* process_context, MinifiStringView controller_service_name, MinifiStringView controller_service_type, MinifiControllerService** controller_service_out);
237+
void MinifiProcessContextGetDynamicProperties(MinifiProcessContext* context, MinifiFlowFile* minifi_flow_file,
238+
void (*cb)(void* user_ctx, MinifiStringView dynamic_property_name, MinifiStringView dynamic_property_value), void* user_ctx);
234239

235240
void MinifiLoggerSetMaxLogSize(MinifiLogger*, int32_t);
236241
void MinifiLoggerLogString(MinifiLogger*, MinifiLogLevel, MinifiStringView);
@@ -240,6 +245,7 @@ MinifiLogLevel MinifiLoggerLevel(MinifiLogger*);
240245
MINIFI_OWNED MinifiFlowFile* MinifiProcessSessionGet(MinifiProcessSession*);
241246
MINIFI_OWNED MinifiFlowFile* MinifiProcessSessionCreate(MinifiProcessSession* session, MinifiFlowFile* parent_flowfile);
242247

248+
MinifiStatus MinifiProcessSessionPenalize(MinifiProcessSession* session, MinifiFlowFile* flowfile);
243249
MinifiStatus MinifiProcessSessionTransfer(MinifiProcessSession* session, MINIFI_OWNED MinifiFlowFile* flowfile, MinifiStringView relationship_name);
244250
MinifiStatus MinifiProcessSessionRemove(MinifiProcessSession* session, MINIFI_OWNED MinifiFlowFile* flowfile);
245251

@@ -253,16 +259,30 @@ size_t MinifiInputStreamSize(MinifiInputStream*);
253259
int64_t MinifiInputStreamRead(MinifiInputStream* stream, char* buffer, size_t size);
254260
int64_t MinifiOutputStreamWrite(MinifiOutputStream* stream, const char* data, size_t size);
255261

256-
MinifiStatus MinifiFlowFileSetAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name, const MinifiStringView* attribute_value);
257-
MinifiBool MinifiFlowFileGetAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name,
262+
MinifiStatus MinifiProcessSessionSetFlowFileAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name, const MinifiStringView* attribute_value);
263+
MinifiBool MinifiProcessSessionGetFlowFileAttribute(MinifiProcessSession* session, MinifiFlowFile* flowfile, MinifiStringView attribute_name,
258264
void(*cb)(void* user_ctx, MinifiStringView attribute_value), void* user_ctx);
259-
void MinifiFlowFileGetAttributes(MinifiProcessSession* session, MinifiFlowFile* flowfile, void(*cb)(void* user_ctx, MinifiStringView attribute_name, MinifiStringView attribute_value), void* user_ctx);
265+
void MinifiProcessSessionGetFlowFileAttributes(MinifiProcessSession* session, MinifiFlowFile* flowfile,
266+
void (*cb)(void* user_ctx, MinifiStringView attribute_name, MinifiStringView attribute_value), void* user_ctx);
267+
uint64_t MinifiProcessSessionGetFlowFileSize(MinifiProcessSession* session, MinifiFlowFile* flowfile);
268+
MinifiStatus MinifiProcessSessionGetFlowFileId(MinifiProcessSession* session, MinifiFlowFile* flowfile, void(*cb)(void* user_ctx, MinifiStringView flow_file_id), void* user_ctx);
260269

261270
MinifiStatus MinifiControllerServiceContextGetProperty(MinifiControllerServiceContext* context,
262271
MinifiStringView property_name,
263272
void(*cb)(void* user_ctx, MinifiStringView property_value),
264273
void* user_ctx);
265274

275+
struct MinifiSslData {
276+
uint8_t version;
277+
MinifiStringView ca_certificate_file;
278+
MinifiStringView certificate_file;
279+
MinifiStringView private_key_file;
280+
MinifiStringView passphrase;
281+
};
282+
283+
MinifiStatus MinifiProcessContextGetSslData(MinifiProcessContext* process_context, MinifiStringView controller_service_name,
284+
void (*cb)(void* user_ctx, const MinifiSslData* ssl_data), void* user_ctx);
285+
266286
#ifdef __cplusplus
267287
} // extern "C"
268288
#endif // __cplusplus

minifi-api/minifi-c-api.def

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,17 @@ EXPORTS
1616
MinifiProcessSessionCreate
1717
MinifiProcessSessionTransfer
1818
MinifiProcessSessionRemove
19+
MinifiProcessSessionPenalize
1920
MinifiProcessSessionRead
2021
MinifiProcessSessionWrite
2122
MinifiConfigGet
2223
MinifiInputStreamSize
2324
MinifiInputStreamRead
2425
MinifiOutputStreamWrite
25-
MinifiFlowFileSetAttribute
26-
MinifiFlowFileGetAttribute
27-
MinifiFlowFileGetAttributes
26+
MinifiProcessSessionSetFlowFileAttribute
27+
MinifiProcessSessionGetFlowFileAttribute
28+
MinifiProcessSessionGetFlowFileAttributes
29+
MinifiProcessSessionGetFlowFileSize
30+
MinifiProcessSessionGetFlowFileId
31+
MinifiProcessContextGetDynamicProperties
32+
MinifiProcessContextGetSslData

0 commit comments

Comments
 (0)