Skip to content

Commit ac458cf

Browse files
committed
MINIFICPP-2764 Controller Service CPP API
1 parent f7eea48 commit ac458cf

15 files changed

Lines changed: 379 additions & 52 deletions

File tree

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#pragma once
19+
20+
#include <string>
21+
22+
#include "minifi-c.h"
23+
#include "minifi-cpp/core/PropertyDefinition.h"
24+
#include "nonstd/expected.hpp"
25+
26+
namespace org::apache::nifi::minifi::api::core {
27+
28+
class ControllerServiceContext {
29+
public:
30+
explicit ControllerServiceContext(MinifiControllerServiceContext* impl) : impl_(impl) {}
31+
32+
nonstd::expected<std::string, std::error_code> getProperty(std::string_view name) const;
33+
nonstd::expected<std::string, std::error_code> getProperty(const minifi::core::PropertyReference& property_reference) const {
34+
return getProperty(property_reference.name);
35+
}
36+
37+
private:
38+
MinifiControllerServiceContext* impl_;
39+
};
40+
41+
} // namespace org::apache::nifi::minifi::api::core
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#pragma once
18+
19+
#include <memory>
20+
#include <string>
21+
22+
#include "ControllerServiceContext.h"
23+
#include "minifi-cpp/core/ControllerServiceMetadata.h"
24+
#include "minifi-cpp/utils/Id.h"
25+
#include "utils/SmallString.h"
26+
27+
namespace org::apache::nifi::minifi::api {
28+
29+
class Connection;
30+
31+
namespace core {
32+
33+
class ControllerServiceImpl {
34+
public:
35+
explicit ControllerServiceImpl(minifi::core::ControllerServiceMetadata metadata);
36+
37+
ControllerServiceImpl(const ControllerServiceImpl&) = delete;
38+
ControllerServiceImpl(ControllerServiceImpl&&) = delete;
39+
ControllerServiceImpl& operator=(const ControllerServiceImpl&) = delete;
40+
ControllerServiceImpl& operator=(ControllerServiceImpl&&) = delete;
41+
42+
virtual ~ControllerServiceImpl();
43+
44+
MinifiStatus enable(ControllerServiceContext&);
45+
void notifyStop();
46+
47+
[[nodiscard]] std::string getName() const;
48+
[[nodiscard]] minifi::utils::Identifier getUUID() const;
49+
[[nodiscard]] minifi::utils::SmallString<36> getUUIDStr() const;
50+
51+
protected:
52+
virtual MinifiStatus enableImpl(api::core::ControllerServiceContext&) = 0;
53+
virtual void notifyStopImpl() {}
54+
55+
minifi::core::ControllerServiceMetadata metadata_;
56+
57+
std::shared_ptr<minifi::core::logging::Logger> logger_;
58+
};
59+
60+
} // namespace core
61+
} // namespace org::apache::nifi::minifi::api

extension-framework/cpp-extension-lib/include/api/core/ProcessContext.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class ProcessContext {
3434
nonstd::expected<std::string, std::error_code> getProperty(const minifi::core::PropertyReference& property_reference, const FlowFile* flow_file = nullptr) const {
3535
return getProperty(property_reference.name, flow_file);
3636
}
37+
nonstd::expected<MinifiControllerService*, std::error_code> getControllerService(std::string_view controller_service_name, std::string_view controller_service_class) const;
3738

3839
bool hasNonEmptyProperty(std::string_view name) const;
3940

extension-framework/cpp-extension-lib/include/api/core/ProcessorImpl.h

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717
#pragma once
1818

1919
#include <atomic>
20-
#include <condition_variable>
2120
#include <memory>
22-
#include <mutex>
2321
#include <string>
2422
#include <vector>
2523

@@ -53,7 +51,7 @@ class ProcessorImpl {
5351

5452
virtual ~ProcessorImpl();
5553

56-
void setTriggerWhenEmpty(bool trigger_when_empty) {
54+
void setTriggerWhenEmpty(const bool trigger_when_empty) {
5755
trigger_when_empty_ = trigger_when_empty;
5856
}
5957

@@ -74,8 +72,8 @@ class ProcessorImpl {
7472
static constexpr auto OutputAttributes = std::array<minifi::core::OutputAttributeReference, 0>{};
7573

7674
std::string getName() const;
77-
utils::Identifier getUUID() const;
78-
utils::SmallString<36> getUUIDStr() const;
75+
minifi::utils::Identifier getUUID() const;
76+
minifi::utils::SmallString<36> getUUIDStr() const;
7977

8078
virtual PublishedMetrics calculateMetrics() const {return {};}
8179

@@ -89,9 +87,6 @@ class ProcessorImpl {
8987
std::atomic<bool> trigger_when_empty_;
9088

9189
std::shared_ptr<minifi::core::logging::Logger> logger_;
92-
93-
private:
94-
mutable std::mutex mutex_;
9590
};
9691

9792
} // namespace core

extension-framework/cpp-extension-lib/include/api/core/Resource.h

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,16 @@
2626
#define WIN32_LEAN_AND_MEAN 1
2727
#endif
2828

29-
#include "minifi-c.h"
30-
#include "core/ClassName.h"
31-
#include "api/utils/minifi-c-utils.h"
29+
#include "ControllerServiceContext.h"
30+
#include "FlowFile.h"
3231
#include "ProcessContext.h"
3332
#include "ProcessSession.h"
34-
#include "FlowFile.h"
35-
#include "minifi-cpp/core/ProcessorMetadata.h"
33+
#include "api/utils/minifi-c-utils.h"
34+
#include "core/ClassName.h"
3635
#include "logging/Logger.h"
36+
#include "minifi-c.h"
37+
#include "minifi-cpp/core/ControllerServiceMetadata.h"
38+
#include "minifi-cpp/core/ProcessorMetadata.h"
3739

3840
namespace org::apache::nifi::minifi::api::core {
3941

@@ -94,11 +96,12 @@ void useProcessorClassDescription(Fn&& fn) {
9496

9597
.callbacks = MinifiProcessorCallbacks{
9698
.create = [] (MinifiProcessorMetadata metadata) -> MINIFI_OWNED void* {
97-
return new Class{minifi::core::ProcessorMetadata{
98-
.uuid = minifi::utils::Identifier::parse(std::string{metadata.uuid.data, metadata.uuid.length}).value(),
99-
.name = std::string{metadata.name.data, metadata.name.length},
100-
.logger = std::make_shared<logging::Logger>(metadata.logger)
101-
}};
99+
try {
100+
return new Class{minifi::core::ProcessorMetadata{
101+
.uuid = minifi::utils::Identifier::parse(std::string{metadata.uuid.data, metadata.uuid.length}).value(),
102+
.name = std::string{metadata.name.data, metadata.name.length},
103+
.logger = std::make_shared<logging::Logger>(metadata.logger)}};
104+
} catch (...) { return nullptr; }
102105
},
103106
.destroy = [] (MINIFI_OWNED void* self) -> void {
104107
delete static_cast<Class*>(self);
@@ -127,7 +130,9 @@ void useProcessorClassDescription(Fn&& fn) {
127130
}
128131
},
129132
.onUnSchedule = [] (void* self) -> void {
130-
static_cast<Class*>(self)->onUnSchedule();
133+
try {
134+
static_cast<Class*>(self)->onUnSchedule();
135+
} catch (...) {}
131136
},
132137
.calculateMetrics = [] (void* self) -> MINIFI_OWNED MinifiPublishedMetrics* {
133138
auto metrics = static_cast<Class*>(self)->calculateMetrics();
@@ -145,4 +150,43 @@ void useProcessorClassDescription(Fn&& fn) {
145150
fn(description);
146151
}
147152

153+
template<typename Class, typename Fn>
154+
void useControllerServiceClassDescription(Fn&& fn) {
155+
std::vector<std::vector<MinifiStringView>> string_vector_cache;
156+
157+
const auto full_name = minifi::core::className<Class>();
158+
159+
std::vector<MinifiPropertyDefinition> class_properties = utils::toProperties(Class::Properties, string_vector_cache);
160+
161+
MinifiControllerServiceClassDefinition description{.full_name = utils::toStringView(full_name),
162+
.description = utils::toStringView(Class::Description),
163+
.class_properties_count = gsl::narrow<uint32_t>(class_properties.size()),
164+
.class_properties_ptr = class_properties.data(),
165+
166+
.callbacks = MinifiControllerServiceCallbacks{
167+
.create = [](MinifiControllerServiceMetadata metadata) -> MINIFI_OWNED void* {
168+
try {
169+
return new Class{minifi::core::ControllerServiceMetadata{
170+
.uuid = minifi::utils::Identifier::parse(std::string{metadata.uuid.data, metadata.uuid.length}).value(),
171+
.name = std::string{metadata.name.data, metadata.name.length},
172+
.logger = std::make_shared<logging::Logger>(metadata.logger)}};
173+
} catch (...) { return nullptr; }
174+
},
175+
.destroy = [](MINIFI_OWNED void* self) -> void { delete static_cast<Class*>(self); },
176+
.enable = [](void* self, MinifiControllerServiceContext* context) -> MinifiStatus {
177+
ControllerServiceContext context_wrapper(context);
178+
try {
179+
return static_cast<Class*>(self)->enable(context_wrapper);
180+
} catch (...) { return MINIFI_STATUS_UNKNOWN_ERROR; }
181+
},
182+
.notifyStop = [](void* self) -> void {
183+
try {
184+
static_cast<Class*>(self)->notifyStop();
185+
} catch (...) {}
186+
},
187+
}};
188+
189+
fn(description);
190+
}
191+
148192
} // namespace org::apache::nifi::minifi::api::core

extension-framework/cpp-extension-lib/include/api/utils/ProcessorConfigUtils.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,4 +167,28 @@ std::optional<T> parseOptionalEnumProperty(const core::ProcessContext& context,
167167
return result.value();
168168
}
169169

170+
template<typename ControllerServiceType>
171+
ControllerServiceType* parseOptionalControllerService(const core::ProcessContext& context, const minifi::core::PropertyReference& prop) {
172+
const auto controller_service_name = context.getProperty(prop.name);
173+
if (!controller_service_name || controller_service_name->empty()) {
174+
return nullptr;
175+
}
176+
177+
nonstd::expected<MinifiControllerService*, std::error_code> service = context.getControllerService(*controller_service_name, minifi::core::className<ControllerServiceType>());
178+
if (!service) {
179+
return nullptr;
180+
}
181+
182+
return reinterpret_cast<ControllerServiceType*>(*service);
183+
}
184+
185+
template<typename ControllerServiceType>
186+
gsl::not_null<ControllerServiceType*> parseControllerService(const core::ProcessContext& context, const minifi::core::PropertyReference& prop) {
187+
auto controller_service = parseOptionalControllerService<ControllerServiceType>(context, prop);
188+
if (!controller_service) {
189+
throw Exception(PROCESS_SCHEDULE_EXCEPTION, fmt::format("Required controller service property '{}' is missing", prop.name));
190+
}
191+
return gsl::make_not_null(controller_service);
192+
}
193+
170194
} // namespace org::apache::nifi::minifi::api::utils

extension-framework/cpp-extension-lib/libtest/CProcessorTestUtils.h

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
*
2+
*
33
* Licensed to the Apache Software Foundation (ASF) under one or more
44
* contributor license agreements. See the NOTICE file distributed with
55
* this work for additional information regarding copyright ownership.
@@ -17,22 +17,40 @@
1717
*/
1818
#pragma once
1919

20+
#include "api/core/Resource.h"
2021
#include "core/Processor.h"
22+
#include "core/controller/ControllerService.h"
23+
#include "minifi-cpp/core/ControllerServiceMetadata.h"
2124
#include "minifi-cpp/core/ProcessorMetadata.h"
22-
#include "api/core/Resource.h"
25+
#include "utils/CControllerService.h"
2326
#include "utils/CProcessor.h"
27+
#include "minifi-cpp/agent/agent_docs.h"
2428

2529
namespace org::apache::nifi::minifi::test::utils {
2630

27-
template<typename T, typename ...Args>
28-
std::unique_ptr<minifi::core::Processor> make_custom_c_processor(minifi::core::ProcessorMetadata metadata, Args&&... args) { // NOLINT(cppcoreguidelines-missing-std-forward)
31+
template<typename T, typename... Args>
32+
std::unique_ptr<minifi::core::Processor> make_custom_c_processor(minifi::core::ProcessorMetadata metadata,
33+
Args&&... args) { // NOLINT(cppcoreguidelines-missing-std-forward)
2934
std::unique_ptr<minifi::core::ProcessorApi> processor_impl;
30-
minifi::api::core::useProcessorClassDescription<T>([&] (const MinifiProcessorClassDefinition& description) {
31-
minifi::utils::useCProcessorClassDescription(description, [&] (const auto&, auto c_description) {
35+
minifi::api::core::useProcessorClassDescription<T>([&](const MinifiProcessorClassDefinition& description) {
36+
minifi::utils::useCProcessorClassDescription(description, [&](const auto&, auto c_description) {
3237
processor_impl = std::make_unique<minifi::utils::CProcessor>(std::move(c_description), metadata, new T(metadata, std::forward<Args>(args)...));
3338
});
3439
});
3540
return std::make_unique<minifi::core::Processor>(metadata.name, metadata.uuid, std::move(processor_impl));
3641
}
3742

43+
template<typename T, typename... Args>
44+
std::shared_ptr<minifi::core::controller::ControllerService> make_custom_c_controller_service(minifi::core::ControllerServiceMetadata metadata,
45+
Args&&... args) { // NOLINT(cppcoreguidelines-missing-std-forward)
46+
std::unique_ptr<minifi::core::controller::ControllerServiceApi> controller_service_impl;
47+
minifi::api::core::useControllerServiceClassDescription<T>([&](const MinifiControllerServiceClassDefinition& description) {
48+
minifi::utils::useCControllerServiceClassDescription(description, [&](const auto&, auto c_description) {
49+
controller_service_impl = std::make_unique<minifi::utils::CControllerService>(std::move(c_description),
50+
metadata,
51+
new T(metadata, std::forward<Args>(args)...));
52+
});
53+
});
54+
return std::make_shared<minifi::core::controller::ControllerService>(metadata.name, metadata.uuid, std::move(controller_service_impl));
55+
}
3856
} // namespace org::apache::nifi::minifi::test::utils
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include "api/core/ControllerServiceContext.h"
19+
#include "api/utils/minifi-c-utils.h"
20+
21+
namespace org::apache::nifi::minifi::api::core {
22+
23+
nonstd::expected<std::string, std::error_code> ControllerServiceContext::getProperty(const std::string_view name) const {
24+
std::optional<std::string> value = std::nullopt;
25+
const MinifiStatus status = MinifiControllerServiceContextGetProperty(impl_, utils::toStringView(name),
26+
[] (void* data, const MinifiStringView result) {
27+
(*static_cast<std::optional<std::string>*>(data)) = std::string(result.data, result.length);
28+
}, &value);
29+
30+
if (status != MINIFI_STATUS_SUCCESS) {
31+
return nonstd::make_unexpected(utils::make_error_code(status));
32+
}
33+
34+
if (!value) {
35+
return nonstd::make_unexpected(utils::make_error_code(MINIFI_STATUS_UNKNOWN_ERROR));
36+
}
37+
return value.value();
38+
}
39+
40+
} // namespace org::apache::nifi::minifi::api::core

0 commit comments

Comments
 (0)