Skip to content

Commit 9debeb4

Browse files
committed
MINIFICPP-2765 Move GCP Extension to stable C API
1 parent 5b9fc33 commit 9debeb4

31 files changed

Lines changed: 792 additions & 623 deletions

core-framework/common/include/core/PropertyDefinitionBuilder.h

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,21 @@ namespace org::apache::nifi::minifi::core {
2727
namespace detail {
2828
template<typename... Types>
2929
inline constexpr auto TypeNames = std::array<std::string_view, sizeof...(Types)>{core::className<Types>()...};
30-
}
30+
31+
template <size_t N>
32+
struct StringLiteral {
33+
char value[N];
34+
constexpr StringLiteral(const char (&str)[N]) { // NOLINT(runtime/explicit)
35+
for (size_t i = 0; i < N; ++i) {
36+
value[i] = str[i];
37+
}
38+
}
39+
};
40+
41+
// A variable template that creates permanent static memory for the span to point to
42+
template <StringLiteral str>
43+
inline constexpr auto StaticAllowedType = std::array<std::string_view, 1>{std::string_view{str.value, sizeof(str.value) - 1}};
44+
} // namespace detail
3145

3246
template<size_t NumAllowedValues = 0>
3347
struct PropertyDefinitionBuilder {
@@ -81,6 +95,12 @@ struct PropertyDefinitionBuilder {
8195
return *this;
8296
}
8397

98+
template <detail::StringLiteral TypeName>
99+
constexpr PropertyDefinitionBuilder<NumAllowedValues> withAllowedType() {
100+
property.allowed_types = detail::StaticAllowedType<TypeName>;
101+
return *this;
102+
}
103+
84104
constexpr PropertyDefinitionBuilder<NumAllowedValues> withValidator(const PropertyValidator& property_validator) {
85105
property.validator = gsl::make_not_null(&property_validator);
86106
return *this;

extension-framework/cpp-extension-lib/include/api/core/ProcessContext.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717

1818
#pragma once
1919

20-
#include <string>
2120
#include <expected>
21+
#include <string>
2222

2323
#include "api/core/FlowFile.h"
24+
#include "api/utils/Proxy.h"
2425
#include "api/utils/Ssl.h"
2526
#include "minifi-c.h"
2627
#include "minifi-cpp/core/PropertyDefinition.h"
@@ -45,6 +46,7 @@ class ProcessContext {
4546
[[nodiscard]] virtual std::map<std::string, std::string> getDynamicProperties(const FlowFile* flow_file) const = 0;
4647

4748
[[nodiscard]] virtual std::expected<utils::net::SslData, std::error_code> getSslData(std::string_view name) const = 0;
49+
[[nodiscard]] virtual std::expected<utils::ProxyData, std::error_code> getProxyData(std::string_view name) const = 0;
4850
};
4951

5052
class CffiProcessContext : public ProcessContext {
@@ -59,6 +61,7 @@ class CffiProcessContext : public ProcessContext {
5961
[[nodiscard]] bool hasNonEmptyProperty(std::string_view name) const override;
6062

6163
[[nodiscard]] std::expected<utils::net::SslData, std::error_code> getSslData(std::string_view name) const override;
64+
[[nodiscard]] std::expected<utils::ProxyData, std::error_code> getProxyData(std::string_view name) const override;
6265

6366
private:
6467
[[nodiscard]] std::expected<std::string, std::error_code> getProperty(std::string_view name, const FlowFile* flow_file) const;
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#pragma once
18+
19+
#include <filesystem>
20+
#include <optional>
21+
#include <string>
22+
23+
namespace org::apache::nifi::minifi::api::utils {
24+
25+
enum class ProxyType {
26+
DIRECT,
27+
HTTP
28+
};
29+
30+
struct BasicAuthCredentials {
31+
std::string username;
32+
std::string password;
33+
};
34+
35+
struct ProxyData {
36+
std::string host;
37+
uint16_t port;
38+
std::optional<BasicAuthCredentials> proxy_credentials;
39+
ProxyType proxy_type;
40+
};
41+
42+
} // namespace org::apache::nifi::minifi::api::utils::net

extension-framework/cpp-extension-lib/include/api/utils/Ssl.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,8 @@
1717
#pragma once
1818

1919
#include <string>
20-
#include <memory>
21-
#include <optional>
2220
#include <filesystem>
2321

24-
#include "utils/Enum.h"
25-
2622
namespace org::apache::nifi::minifi::api::utils::net {
2723

2824
enum class ClientAuthOption {

extension-framework/cpp-extension-lib/src/core/ProcessContext.cpp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,34 @@ std::expected<utils::net::SslData, std::error_code> CffiProcessContext::getSslDa
8686
return ssl_data;
8787
}
8888

89+
std::expected<utils::ProxyData, std::error_code> CffiProcessContext::getProxyData(const std::string_view name) const {
90+
auto proxy_data = utils::ProxyData{};
91+
92+
if (const auto status = MinifiProcessContextGetProxyData(
93+
impl_,
94+
utils::minifiStringView(name),
95+
[](void* data, const MinifiProxyData* minifi_proxy_data) {
96+
auto* proxy = static_cast<utils::ProxyData*>(data);
97+
proxy->host = utils::toString(minifi_proxy_data->hostname);
98+
proxy->port = minifi_proxy_data->port;
99+
if (minifi_proxy_data->password && minifi_proxy_data->username) {
100+
proxy->proxy_credentials = utils::BasicAuthCredentials{.username = utils::toString(*minifi_proxy_data->username),
101+
.password = utils::toString(*minifi_proxy_data->password)};
102+
} else {
103+
proxy->proxy_credentials = std::nullopt;
104+
}
105+
if (minifi_proxy_data->proxy_type == MINIFI_PROXY_TYPE_HTTP) {
106+
proxy->proxy_type = utils::ProxyType::HTTP;
107+
} else {
108+
proxy->proxy_type = utils::ProxyType::DIRECT;
109+
}
110+
},
111+
&proxy_data);
112+
status != MINIFI_STATUS_SUCCESS) {
113+
return std::unexpected{utils::make_error_code(status)};
114+
}
115+
116+
return proxy_data;
117+
}
118+
89119
} // namespace org::apache::nifi::minifi::api::core

extensions/gcp/CMakeLists.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ add_minifi_library(minifi-gcp SHARED ${SOURCES})
3030
if (NOT WIN32)
3131
target_compile_options(minifi-gcp PRIVATE -Wno-error=deprecated-declarations) # Suppress deprecation warnings for std::rel_ops usage
3232
endif()
33-
target_link_libraries(minifi-gcp ${LIBMINIFI} google-cloud-cpp::storage)
34-
target_include_directories(minifi-gcp SYSTEM PUBLIC ${google-cloud-cpp_INCLUDE_DIRS})
33+
target_link_libraries(minifi-gcp minifi-cpp-extension-lib google-cloud-cpp::storage)
3534

36-
register_extension(minifi-gcp "GCP EXTENSIONS" GCP-EXTENSIONS "This enables Google Cloud Platform support" "extensions/gcp/tests")
35+
target_include_directories(minifi-gcp SYSTEM PUBLIC ${google-cloud-cpp_INCLUDE_DIRS})
3736

37+
register_c_api_extension(minifi-gcp "GCP EXTENSIONS" GCP-EXTENSIONS "This enables Google Cloud Platform support" "extensions/gcp/tests")
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include "../../extension-framework/cpp-extension-lib/include/api/core/Resource.h"
19+
#include "api/core/Resource.h"
20+
#include "api/utils/minifi-c-utils.h"
21+
#include "processors/DeleteGCSObject.h"
22+
#include "processors/FetchGCSObject.h"
23+
#include "processors/ListGCSBucket.h"
24+
#include "processors/PutGCSObject.h"
25+
26+
#define MKSOC(x) #x
27+
#define MAKESTRING(x) MKSOC(x) // NOLINT(cppcoreguidelines-macro-usage)
28+
29+
namespace minifi = org::apache::nifi::minifi;
30+
31+
CEXTENSIONAPI const uint32_t MinifiApiVersion = MINIFI_API_VERSION;
32+
33+
CEXTENSIONAPI void MinifiInitExtension(MinifiExtensionContext* extension_context) {
34+
const MinifiExtensionDefinition extension_definition{
35+
.name = minifi::api::utils::minifiStringView(MAKESTRING(EXTENSION_NAME)),
36+
.version = minifi::api::utils::minifiStringView(MAKESTRING(EXTENSION_VERSION)),
37+
.deinit = nullptr,
38+
.user_data = nullptr
39+
};
40+
auto* extension = MinifiRegisterExtension(extension_context, &extension_definition);
41+
minifi::api::core::registerProcessors<minifi::extensions::gcp::DeleteGCSObject,
42+
minifi::extensions::gcp::FetchGCSObject,
43+
minifi::extensions::gcp::ListGCSBucket,
44+
minifi::extensions::gcp::PutGCSObject>(extension);
45+
minifi::api::core::registerControllerServices<minifi::extensions::gcp::GCPCredentialsControllerService>(extension);
46+
}

extensions/gcp/GCPAttributes.h

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@
1919

2020
#include <string_view>
2121

22+
#include "api/core/FlowFile.h"
23+
#include "api/core/ProcessSession.h"
2224
#include "google/cloud/storage/object_metadata.h"
23-
#include "minifi-cpp/core/FlowFile.h"
2425

2526
namespace org::apache::nifi::minifi::extensions::gcp {
2627

@@ -50,32 +51,32 @@ constexpr std::string_view GCS_SELF_LINK_ATTR = "gcs.self.link";
5051
constexpr std::string_view GCS_ENCRYPTION_ALGORITHM_ATTR = "gcs.encryption.algorithm";
5152
constexpr std::string_view GCS_ENCRYPTION_SHA256_ATTR = "gcs.encryption.sha256";
5253

53-
inline void setAttributesFromObjectMetadata(core::FlowFile& flow_file, const ::google::cloud::storage::ObjectMetadata& object_metadata) {
54-
flow_file.setAttribute(GCS_BUCKET_ATTR, object_metadata.bucket());
55-
flow_file.setAttribute(GCS_OBJECT_NAME_ATTR, object_metadata.name());
56-
flow_file.setAttribute(GCS_SIZE_ATTR, std::to_string(object_metadata.size()));
57-
flow_file.setAttribute(GCS_CRC32C_ATTR, object_metadata.crc32c());
58-
flow_file.setAttribute(GCS_MD5_ATTR, object_metadata.md5_hash());
59-
flow_file.setAttribute(GCS_CONTENT_ENCODING_ATTR, object_metadata.content_encoding());
60-
flow_file.setAttribute(GCS_CONTENT_LANGUAGE_ATTR, object_metadata.content_language());
61-
flow_file.setAttribute(GCS_CONTENT_DISPOSITION_ATTR, object_metadata.content_disposition());
62-
flow_file.setAttribute(GCS_CREATE_TIME_ATTR, std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(object_metadata.time_created().time_since_epoch()).count()));
63-
flow_file.setAttribute(GCS_UPDATE_TIME_ATTR, std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(object_metadata.updated().time_since_epoch()).count()));
64-
flow_file.setAttribute(GCS_DELETE_TIME_ATTR, std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(object_metadata.time_deleted().time_since_epoch()).count()));
65-
flow_file.setAttribute(GCS_MEDIA_LINK_ATTR, object_metadata.media_link());
66-
flow_file.setAttribute(GCS_SELF_LINK_ATTR, object_metadata.self_link());
67-
flow_file.setAttribute(GCS_ETAG_ATTR, object_metadata.etag());
68-
flow_file.setAttribute(GCS_GENERATED_ID, object_metadata.id());
69-
flow_file.setAttribute(GCS_META_GENERATION, std::to_string(object_metadata.metageneration()));
70-
flow_file.setAttribute(GCS_GENERATION, std::to_string(object_metadata.generation()));
71-
flow_file.setAttribute(GCS_STORAGE_CLASS, object_metadata.storage_class());
54+
inline void setAttributesFromObjectMetadata(api::core::FlowFile& flow_file, const ::google::cloud::storage::ObjectMetadata& object_metadata, api::core::ProcessSession& session) {
55+
session.setAttribute(flow_file, GCS_BUCKET_ATTR, object_metadata.bucket());
56+
session.setAttribute(flow_file, GCS_OBJECT_NAME_ATTR, object_metadata.name());
57+
session.setAttribute(flow_file, GCS_SIZE_ATTR, std::to_string(object_metadata.size()));
58+
session.setAttribute(flow_file, GCS_CRC32C_ATTR, object_metadata.crc32c());
59+
session.setAttribute(flow_file, GCS_MD5_ATTR, object_metadata.md5_hash());
60+
session.setAttribute(flow_file, GCS_CONTENT_ENCODING_ATTR, object_metadata.content_encoding());
61+
session.setAttribute(flow_file, GCS_CONTENT_LANGUAGE_ATTR, object_metadata.content_language());
62+
session.setAttribute(flow_file, GCS_CONTENT_DISPOSITION_ATTR, object_metadata.content_disposition());
63+
session.setAttribute(flow_file, GCS_CREATE_TIME_ATTR, std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(object_metadata.time_created().time_since_epoch()).count()));
64+
session.setAttribute(flow_file, GCS_UPDATE_TIME_ATTR, std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(object_metadata.updated().time_since_epoch()).count()));
65+
session.setAttribute(flow_file, GCS_DELETE_TIME_ATTR, std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(object_metadata.time_deleted().time_since_epoch()).count()));
66+
session.setAttribute(flow_file, GCS_MEDIA_LINK_ATTR, object_metadata.media_link());
67+
session.setAttribute(flow_file, GCS_SELF_LINK_ATTR, object_metadata.self_link());
68+
session.setAttribute(flow_file, GCS_ETAG_ATTR, object_metadata.etag());
69+
session.setAttribute(flow_file, GCS_GENERATED_ID, object_metadata.id());
70+
session.setAttribute(flow_file, GCS_META_GENERATION, std::to_string(object_metadata.metageneration()));
71+
session.setAttribute(flow_file, GCS_GENERATION, std::to_string(object_metadata.generation()));
72+
session.setAttribute(flow_file, GCS_STORAGE_CLASS, object_metadata.storage_class());
7273
if (object_metadata.has_customer_encryption()) {
73-
flow_file.setAttribute(GCS_ENCRYPTION_ALGORITHM_ATTR, object_metadata.customer_encryption().encryption_algorithm);
74-
flow_file.setAttribute(GCS_ENCRYPTION_SHA256_ATTR, object_metadata.customer_encryption().key_sha256);
74+
session.setAttribute(flow_file, GCS_ENCRYPTION_ALGORITHM_ATTR, object_metadata.customer_encryption().encryption_algorithm);
75+
session.setAttribute(flow_file, GCS_ENCRYPTION_SHA256_ATTR, object_metadata.customer_encryption().key_sha256);
7576
}
7677
if (object_metadata.has_owner()) {
77-
flow_file.setAttribute(GCS_OWNER_ENTITY_ATTR, object_metadata.owner().entity);
78-
flow_file.setAttribute(GCS_OWNER_ENTITY_ID_ATTR, object_metadata.owner().entity_id);
78+
session.setAttribute(flow_file, GCS_OWNER_ENTITY_ATTR, object_metadata.owner().entity);
79+
session.setAttribute(flow_file, GCS_OWNER_ENTITY_ID_ATTR, object_metadata.owner().entity_id);
7980
}
8081
}
8182

extensions/gcp/controllerservices/GCPCredentialsControllerService.cpp

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,36 @@
1818

1919
#include "GCPCredentialsControllerService.h"
2020

21-
#include "core/Resource.h"
2221
#include "google/cloud/storage/client.h"
23-
#include "utils/ProcessorConfigUtils.h"
24-
#include "utils/file/FileUtils.h"
2522

2623
namespace org::apache::nifi::minifi::extensions::gcp {
2724

28-
void GCPCredentialsControllerService::initialize() {
29-
setSupportedProperties(Properties);
25+
namespace {
26+
// TODO(MINIFICPP-2763) use utils::file::get_content instead
27+
std::string get_content(const std::filesystem::path& file_name) {
28+
std::ifstream file(file_name, std::ifstream::binary);
29+
std::string content((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
30+
return content;
31+
}
3032
}
3133

32-
std::shared_ptr<google::cloud::Credentials> GCPCredentialsControllerService::createCredentialsFromJsonPath() const {
33-
const auto json_path = getProperty(JsonFilePath.name);
34+
std::shared_ptr<google::cloud::Credentials> GCPCredentialsControllerService::createCredentialsFromJsonPath(api::core::ControllerServiceContext& ctx) const {
35+
const auto json_path = ctx.getProperty(JsonFilePath.name);
3436
if (!json_path) {
3537
logger_->log_error("Missing or invalid {}", JsonFilePath.name);
3638
return nullptr;
3739
}
3840

39-
if (!utils::file::exists(*json_path)) {
41+
if (std::error_code ec; !std::filesystem::exists(*json_path, ec) || ec) {
4042
logger_->log_error("JSON file for GCP credentials '{}' does not exist", *json_path);
4143
return nullptr;
4244
}
4345

44-
return google::cloud::MakeServiceAccountCredentials(utils::file::get_content(*json_path));
46+
return google::cloud::MakeServiceAccountCredentials(get_content(*json_path));
4547
}
4648

47-
std::shared_ptr<google::cloud::Credentials> GCPCredentialsControllerService::createCredentialsFromJsonContents() const {
48-
auto json_contents = getProperty(JsonContents.name);
49+
std::shared_ptr<google::cloud::Credentials> GCPCredentialsControllerService::createCredentialsFromJsonContents(api::core::ControllerServiceContext& ctx) const {
50+
auto json_contents = ctx.getProperty(JsonContents.name);
4951
if (!json_contents) {
5052
logger_->log_error("Missing or invalid {}", JsonContents.name);
5153
return nullptr;
@@ -54,9 +56,9 @@ std::shared_ptr<google::cloud::Credentials> GCPCredentialsControllerService::cre
5456
return google::cloud::MakeServiceAccountCredentials(*json_contents);
5557
}
5658

57-
void GCPCredentialsControllerService::onEnable() {
59+
MinifiStatus GCPCredentialsControllerService::enableImpl(api::core::ControllerServiceContext& ctx) {
5860
std::optional<CredentialsLocation> credentials_location;
59-
if (const auto value = getProperty(CredentialsLoc.name)) {
61+
if (const auto value = ctx.getProperty(CredentialsLoc.name)) {
6062
credentials_location = magic_enum::enum_cast<CredentialsLocation>(*value);
6163
}
6264
if (!credentials_location) {
@@ -68,15 +70,15 @@ void GCPCredentialsControllerService::onEnable() {
6870
} else if (*credentials_location == CredentialsLocation::USE_COMPUTE_ENGINE_CREDENTIALS) {
6971
credentials_ = google::cloud::MakeComputeEngineCredentials();
7072
} else if (*credentials_location == CredentialsLocation::USE_JSON_FILE) {
71-
credentials_ = createCredentialsFromJsonPath();
73+
credentials_ = createCredentialsFromJsonPath(ctx);
7274
} else if (*credentials_location == CredentialsLocation::USE_JSON_CONTENTS) {
73-
credentials_ = createCredentialsFromJsonContents();
75+
credentials_ = createCredentialsFromJsonContents(ctx);
7476
} else if (*credentials_location == CredentialsLocation::USE_ANONYMOUS_CREDENTIALS) {
7577
credentials_ = google::cloud::MakeInsecureCredentials();
7678
}
7779
if (!credentials_)
7880
logger_->log_error("Couldn't create valid credentials");
81+
return MINIFI_STATUS_SUCCESS;
7982
}
8083

81-
REGISTER_RESOURCE(GCPCredentialsControllerService, ControllerService);
8284
} // namespace org::apache::nifi::minifi::extensions::gcp

0 commit comments

Comments
 (0)