Skip to content

Commit e3728f1

Browse files
adamdebrecenilordgamez
authored andcommitted
MINIFICPP-2613 - Move base metrics to libminifi
Signed-off-by: Gabor Gyimesi <gamezbird@gmail.com> This closes #2020
1 parent d090128 commit e3728f1

23 files changed

Lines changed: 170 additions & 189 deletions

File tree

core-framework/include/core/ProcessorImpl.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@
3333
#include "minifi-cpp/core/Annotation.h"
3434
#include "minifi-cpp/core/DynamicProperty.h"
3535
#include "minifi-cpp/core/Scheduling.h"
36-
#include "minifi-cpp/core/state/nodes/MetricsBase.h"
37-
#include "minifi-cpp/core/ProcessorMetrics.h"
36+
#include "minifi-cpp/core/ProcessorMetricsExtension.h"
3837
#include "minifi-cpp/utils/gsl.h"
3938
#include "utils/Id.h"
4039
#include "minifi-cpp/core/OutputAttributeDefinition.h"
@@ -118,8 +117,8 @@ class ProcessorImpl : public virtual ProcessorApi {
118117

119118
annotation::Input getInputRequirement() const override = 0;
120119

121-
gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const override {
122-
return metrics_;
120+
std::shared_ptr<ProcessorMetricsExtension> getMetricsExtension() const override {
121+
return metrics_extension_;
123122
}
124123

125124
static constexpr auto DynamicProperties = std::array<DynamicProperty, 0>{};
@@ -142,7 +141,7 @@ class ProcessorImpl : public virtual ProcessorApi {
142141

143142
std::atomic<bool> trigger_when_empty_;
144143

145-
gsl::not_null<std::shared_ptr<ProcessorMetrics>> metrics_;
144+
std::shared_ptr<ProcessorMetricsExtension> metrics_extension_;
146145

147146
std::shared_ptr<logging::Logger> logger_;
148147

core-framework/src/core/ProcessorImpl.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
#include "range/v3/algorithm/any_of.hpp"
3737
#include "fmt/format.h"
3838
#include "minifi-cpp/Exception.h"
39-
#include "core/ProcessorMetrics.h"
4039
#include "minifi-cpp/core/ProcessorDescriptor.h"
4140

4241
using namespace std::literals::chrono_literals;
@@ -46,7 +45,6 @@ namespace org::apache::nifi::minifi::core {
4645
ProcessorImpl::ProcessorImpl(ProcessorMetadata metadata)
4746
: metadata_(std::move(metadata)),
4847
trigger_when_empty_(false),
49-
metrics_(std::make_shared<ProcessorMetricsImpl>(*this)),
5048
logger_(metadata_.logger) {
5149
logger_->log_debug("Processor {} created with uuid {}", getName(), getUUIDStr());
5250
}

extensions/llamacpp/processors/RunLlamaCppInference.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,13 @@ void RunLlamaCppInference::onSchedule(core::ProcessContext& context, core::Proce
6464
}
6565

6666
void RunLlamaCppInference::increaseTokensIn(uint64_t token_count) {
67-
auto* const llamacpp_metrics = dynamic_cast<RunLlamaCppInferenceMetrics*>(metrics_.get());
67+
auto* const llamacpp_metrics = dynamic_cast<RunLlamaCppInferenceMetrics*>(metrics_extension_.get());
6868
gsl_Assert(llamacpp_metrics);
6969
llamacpp_metrics->tokens_in += token_count;
7070
}
7171

7272
void RunLlamaCppInference::increaseTokensOut(uint64_t token_count) {
73-
auto* const llamacpp_metrics = dynamic_cast<RunLlamaCppInferenceMetrics*>(metrics_.get());
73+
auto* const llamacpp_metrics = dynamic_cast<RunLlamaCppInferenceMetrics*>(metrics_extension_.get());
7474
gsl_Assert(llamacpp_metrics);
7575
llamacpp_metrics->tokens_out += token_count;
7676
}

extensions/llamacpp/processors/RunLlamaCppInference.h

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,36 +24,34 @@
2424
#include "core/logging/LoggerFactory.h"
2525
#include "core/PropertyDefinitionBuilder.h"
2626
#include "LlamaContext.h"
27-
#include "core/ProcessorMetrics.h"
27+
#include "minifi-cpp/core/ProcessorMetricsExtension.h"
28+
#include "core/state/Value.h"
2829

2930
namespace org::apache::nifi::minifi::extensions::llamacpp::processors {
3031

3132
using LlamaContextProvider =
3233
std::function<std::unique_ptr<LlamaContext>(const std::filesystem::path& model_path, const LlamaSamplerParams& llama_sampler_params, const LlamaContextParams& llama_ctx_params)>;
3334

34-
class RunLlamaCppInferenceMetrics : public core::ProcessorMetricsImpl {
35+
class RunLlamaCppInferenceMetrics : public core::ProcessorMetricsExtension {
3536
public:
36-
explicit RunLlamaCppInferenceMetrics(const core::ProcessorImpl& source_processor)
37-
: core::ProcessorMetricsImpl(source_processor) {
38-
}
37+
RunLlamaCppInferenceMetrics() = default;
3938

4039
std::vector<state::response::SerializedResponseNode> serialize() override {
41-
auto resp = core::ProcessorMetricsImpl::serialize();
42-
auto& root_node = resp[0];
40+
std::vector<state::response::SerializedResponseNode> resp;
4341

4442
state::response::SerializedResponseNode tokens_in_node{"TokensIn", tokens_in.load()};
45-
root_node.children.push_back(tokens_in_node);
43+
resp.push_back(tokens_in_node);
4644

4745
state::response::SerializedResponseNode tokens_out_node{"TokensOut", tokens_out.load()};
48-
root_node.children.push_back(tokens_out_node);
46+
resp.push_back(tokens_out_node);
4947

5048
return resp;
5149
}
5250

5351
std::vector<state::PublishedMetric> calculateMetrics() override {
54-
auto metrics = core::ProcessorMetricsImpl::calculateMetrics();
55-
metrics.push_back({"tokens_in", static_cast<double>(tokens_in.load()), getCommonLabels()});
56-
metrics.push_back({"tokens_out", static_cast<double>(tokens_out.load()), getCommonLabels()});
52+
std::vector<state::PublishedMetric> metrics;
53+
metrics.push_back({"tokens_in", static_cast<double>(tokens_in.load()), {}});
54+
metrics.push_back({"tokens_out", static_cast<double>(tokens_out.load()), {}});
5755
return metrics;
5856
}
5957

@@ -66,7 +64,7 @@ class RunLlamaCppInference : public core::ProcessorImpl {
6664
explicit RunLlamaCppInference(core::ProcessorMetadata metadata, LlamaContextProvider llama_context_provider = {})
6765
: core::ProcessorImpl(metadata),
6866
llama_context_provider_(std::move(llama_context_provider)) {
69-
metrics_ = gsl::make_not_null(std::make_shared<RunLlamaCppInferenceMetrics>(*this));
67+
metrics_extension_ = gsl::make_not_null(std::make_shared<RunLlamaCppInferenceMetrics>());
7068
}
7169
~RunLlamaCppInference() override = default;
7270

extensions/mqtt/processors/PublishMQTT.cpp

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -326,25 +326,22 @@ uint16_t PublishMQTT::InFlightMessageCounter::getCounter() const {
326326
return counter_;
327327
}
328328

329-
PublishMQTT::PublishMQTTMetrics::PublishMQTTMetrics(const core::ProcessorImpl& source_processor, const InFlightMessageCounter& in_flight_message_counter)
330-
: core::ProcessorMetricsImpl(source_processor),
331-
in_flight_message_counter_(&in_flight_message_counter) {
329+
PublishMQTT::PublishMQTTMetrics::PublishMQTTMetrics(const InFlightMessageCounter& in_flight_message_counter)
330+
: in_flight_message_counter_(&in_flight_message_counter) {
332331
}
333332

334333
std::vector<state::response::SerializedResponseNode> PublishMQTT::PublishMQTTMetrics::serialize() {
335-
auto metrics_vector = core::ProcessorMetricsImpl::serialize();
336-
gsl_Expects(!metrics_vector.empty());
337-
auto& metrics = metrics_vector[0];
334+
std::vector<state::response::SerializedResponseNode> metrics;
338335

339336
state::response::SerializedResponseNode in_flight_message_count_node{"InFlightMessageCount", static_cast<uint32_t>(in_flight_message_counter_->getCounter())};
340-
metrics.children.push_back(in_flight_message_count_node);
337+
metrics.push_back(in_flight_message_count_node);
341338

342-
return metrics_vector;
339+
return metrics;
343340
}
344341

345342
std::vector<state::PublishedMetric> PublishMQTT::PublishMQTTMetrics::calculateMetrics() {
346-
auto metrics = core::ProcessorMetricsImpl::calculateMetrics();
347-
metrics.push_back({"in_flight_message_count", static_cast<double>(in_flight_message_counter_->getCounter()), getCommonLabels()});
343+
std::vector<state::PublishedMetric> metrics;
344+
metrics.push_back({"in_flight_message_count", static_cast<double>(in_flight_message_counter_->getCounter()), {}});
348345
return metrics;
349346
}
350347

extensions/mqtt/processors/PublishMQTT.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
4141
public:
4242
explicit PublishMQTT(core::ProcessorMetadata metadata)
4343
: processors::AbstractMQTTProcessor(metadata) {
44-
metrics_ = gsl::make_not_null(std::make_shared<PublishMQTTMetrics>(*this, in_flight_message_counter_));
44+
metrics_extension_ = gsl::make_not_null(std::make_shared<PublishMQTTMetrics>(in_flight_message_counter_));
4545
}
4646

4747
EXTENSIONAPI static constexpr const char* Description = "PublishMQTT serializes FlowFile content as an MQTT payload, sending the message to the configured topic and broker.";
@@ -111,9 +111,9 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
111111
uint16_t limit_{MQTT_MAX_RECEIVE_MAXIMUM};
112112
};
113113

114-
class PublishMQTTMetrics : public core::ProcessorMetricsImpl {
114+
class PublishMQTTMetrics : public core::ProcessorMetricsExtension {
115115
public:
116-
PublishMQTTMetrics(const core::ProcessorImpl& source_processor, const InFlightMessageCounter& in_flight_message_counter);
116+
explicit PublishMQTTMetrics(const InFlightMessageCounter& in_flight_message_counter);
117117
std::vector<state::response::SerializedResponseNode> serialize() override;
118118
std::vector<state::PublishedMetric> calculateMetrics() override;
119119

extensions/standard-processors/processors/GetFile.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ bool GetFile::fileMatchesRequestCriteria(const std::filesystem::path& full_name,
178178
return false;
179179
}
180180

181-
auto* const getfile_metrics = dynamic_cast<GetFileMetrics*>(metrics_.get());
181+
auto* const getfile_metrics = dynamic_cast<GetFileMetrics*>(metrics_extension_.get());
182182
gsl_Assert(getfile_metrics);
183183
getfile_metrics->input_bytes += file_size;
184184
++getfile_metrics->accepted_files;

extensions/standard-processors/processors/GetFile.h

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
#include "core/Core.h"
3535
#include "core/logging/LoggerFactory.h"
3636
#include "minifi-cpp/utils/Export.h"
37-
#include "core/ProcessorMetrics.h"
37+
#include "minifi-cpp/core/ProcessorMetricsExtension.h"
3838

3939
namespace org::apache::nifi::minifi::processors {
4040

@@ -52,29 +52,24 @@ struct GetFileRequest {
5252
std::filesystem::path inputDirectory;
5353
};
5454

55-
class GetFileMetrics : public core::ProcessorMetricsImpl {
55+
class GetFileMetrics : public core::ProcessorMetricsExtension {
5656
public:
57-
explicit GetFileMetrics(const core::ProcessorImpl& source_processor)
58-
: core::ProcessorMetricsImpl(source_processor) {
59-
}
60-
6157
std::vector<state::response::SerializedResponseNode> serialize() override {
62-
auto resp = core::ProcessorMetricsImpl::serialize();
63-
auto& root_node = resp[0];
58+
std::vector<state::response::SerializedResponseNode> resp;
6459

6560
state::response::SerializedResponseNode accepted_files_node{"AcceptedFiles", accepted_files.load()};
66-
root_node.children.push_back(accepted_files_node);
61+
resp.push_back(accepted_files_node);
6762

6863
state::response::SerializedResponseNode input_bytes_node{"InputBytes", input_bytes.load()};
69-
root_node.children.push_back(input_bytes_node);
64+
resp.push_back(input_bytes_node);
7065

7166
return resp;
7267
}
7368

7469
std::vector<state::PublishedMetric> calculateMetrics() override {
75-
auto metrics = core::ProcessorMetricsImpl::calculateMetrics();
76-
metrics.push_back({"accepted_files", static_cast<double>(accepted_files.load()), getCommonLabels()});
77-
metrics.push_back({"input_bytes", static_cast<double>(input_bytes.load()), getCommonLabels()});
70+
std::vector<state::PublishedMetric> metrics;
71+
metrics.push_back({"accepted_files", static_cast<double>(accepted_files.load()), {}});
72+
metrics.push_back({"input_bytes", static_cast<double>(input_bytes.load()), {}});
7873
return metrics;
7974
}
8075

@@ -86,7 +81,7 @@ class GetFile : public core::ProcessorImpl {
8681
public:
8782
explicit GetFile(core::ProcessorMetadata metadata)
8883
: ProcessorImpl(metadata) {
89-
metrics_ = gsl::make_not_null(std::make_shared<GetFileMetrics>(*this));
84+
metrics_extension_ = std::make_shared<GetFileMetrics>();
9085
}
9186
~GetFile() override = default;
9287

libminifi/include/core/ProcessSession.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
#include "provenance/Provenance.h"
3939
#include "core/Relationship.h"
4040
#include "minifi-cpp/utils/gsl.h"
41-
#include "minifi-cpp/core/ProcessorMetrics.h"
41+
#include "core/ProcessorMetrics.h"
4242
#include "minifi-cpp/core/ProcessSession.h"
4343

4444
namespace org::apache::nifi::minifi::core::detail {
@@ -123,7 +123,7 @@ class ProcessSessionImpl : public ReferenceContainerImpl, public virtual Process
123123

124124
bool existsFlowFileInRelationship(const Relationship &relationship) override;
125125

126-
void setMetrics(const std::shared_ptr<ProcessorMetrics>& metrics) override {
126+
void setMetrics(const std::shared_ptr<ProcessorMetrics>& metrics) {
127127
metrics_ = metrics;
128128
}
129129

libminifi/include/core/Processor.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
#include "minifi-cpp/core/DynamicProperty.h"
3939
#include "minifi-cpp/core/Scheduling.h"
4040
#include "minifi-cpp/core/state/nodes/MetricsBase.h"
41-
#include "minifi-cpp/core/ProcessorMetrics.h"
41+
#include "core/ProcessorMetrics.h"
4242
#include "minifi-cpp/utils/gsl.h"
4343
#include "utils/Id.h"
4444
#include "minifi-cpp/core/OutputAttributeDefinition.h"
@@ -113,6 +113,7 @@ class Processor : public ConnectableImpl, public ConfigurableComponentImpl, publ
113113
[[nodiscard]] bool supportsDynamicRelationships() const override;
114114
state::response::SharedResponseNode getResponseNode() override;
115115
gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const;
116+
std::shared_ptr<ProcessorMetricsExtension> getMetricsExtension() const;
116117
std::string getProcessGroupName() const;
117118
void setProcessGroupName(const std::string &name);
118119
std::string getProcessGroupPath() const;
@@ -170,6 +171,8 @@ class Processor : public ConnectableImpl, public ConfigurableComponentImpl, publ
170171
std::string process_group_name_;
171172
std::string process_group_path_;
172173

174+
gsl::not_null<std::shared_ptr<ProcessorMetrics>> metrics_;
175+
173176
protected:
174177
std::unique_ptr<ProcessorApi> impl_;
175178
};

0 commit comments

Comments
 (0)