|
21 | 21 |
|
22 | 22 | namespace org::apache::nifi::minifi::mock { |
23 | 23 |
|
24 | | -MockProcessSession::~MockProcessSession() { |
25 | | - // api::core::FlowFile requires that somebody takes ownership before it goes out of scope |
26 | | - // the tested processor should either remove or transfer all FlowFiles it handles |
27 | | - for (auto& ff : removed_flow_files_) { |
28 | | - delete ff.release(); // NOLINT(cppcoreguidelines-owning-memory) |
29 | | - } |
30 | | - for (auto& ffs : transferred_flow_files_ | std::views::values) { |
31 | | - for (auto& ff : ffs) { |
32 | | - delete ff.release(); // NOLINT(cppcoreguidelines-owning-memory) |
33 | | - } |
34 | | - } |
35 | | -} |
36 | | - |
37 | 24 | api::core::FlowFile MockProcessSession::create(const api::core::FlowFile*) { |
38 | 25 | auto new_ff = api::core::FlowFile{new MinifiFlowFile}; // NOLINT(clang-analyzer-cplusplus.NewDeleteLeaks) |
39 | | - flow_file_datas[new_ff.get()]; |
40 | 26 | return new_ff; |
41 | 27 | } |
42 | 28 | api::core::FlowFile MockProcessSession::get() { |
43 | 29 | if (input_flow_files_.empty()) { return nullptr; } |
44 | 30 |
|
45 | 31 | auto ff = std::move(input_flow_files_.back()); |
46 | 32 | input_flow_files_.pop_back(); |
47 | | - flow_file_datas[ff.get()]; |
48 | 33 |
|
49 | | - return ff; |
| 34 | + return api::core::FlowFile{ff.release()}; |
50 | 35 | } |
51 | 36 |
|
52 | 37 | void MockProcessSession::penalize(api::core::FlowFile& ff) { |
53 | | - auto& flow_file_data = flow_file_datas.at(ff.get()); |
54 | | - flow_file_data.is_penalized = true; |
| 38 | + ff->is_penalized = true; |
55 | 39 | } |
56 | 40 |
|
57 | 41 | void MockProcessSession::transfer(api::core::FlowFile ff, const minifi::core::Relationship& relationship) { |
58 | | - transferred_flow_files_[relationship.getName()].push_back(std::move(ff)); |
| 42 | + transferred_flow_files_[relationship.getName()].push_back(std::unique_ptr<MinifiFlowFile>(ff.release())); |
59 | 43 | } |
60 | 44 |
|
61 | 45 | void MockProcessSession::remove(api::core::FlowFile ff) { |
62 | | - removed_flow_files_.push_back(std::move(ff)); |
| 46 | + removed_flow_files_.push_back(std::unique_ptr<MinifiFlowFile>(ff.release())); |
63 | 47 | } |
64 | 48 |
|
65 | 49 | void MockProcessSession::write(api::core::FlowFile& ff, const io::OutputStreamCallback& callback) { |
66 | | - auto& flow_file_data = flow_file_datas.at(ff.get()); |
67 | | - const auto stream = std::make_shared<MockOutputStream>(flow_file_data.content); |
| 50 | + const auto stream = std::make_shared<MockOutputStream>(ff->content); |
68 | 51 | callback(stream); |
69 | 52 | } |
70 | 53 |
|
71 | 54 | void MockProcessSession::read(api::core::FlowFile& ff, const io::InputStreamCallback& callback) { |
72 | | - auto& flow_file_data = flow_file_datas.at(ff.get()); |
73 | | - const auto stream = std::make_shared<MockInputStream>(flow_file_data.content); |
| 55 | + const auto stream = std::make_shared<MockInputStream>(ff->content); |
74 | 56 | callback(stream); |
75 | 57 | } |
76 | 58 |
|
77 | 59 | void MockProcessSession::setAttribute(api::core::FlowFile& ff, std::string_view key, std::string value) { |
78 | | - auto& attributes = flow_file_datas.at(ff.get()).attributes; |
79 | | - attributes[std::string(key)] = std::move(value); |
| 60 | + ff->attributes[std::string(key)] = std::move(value); |
80 | 61 | } |
81 | 62 |
|
82 | 63 | void MockProcessSession::removeAttribute(api::core::FlowFile& ff, std::string_view key) { |
83 | | - auto& attributes = flow_file_datas.at(ff.get()).attributes; |
84 | | - attributes.erase(std::string(key)); |
| 64 | + ff->attributes.erase(std::string(key)); |
85 | 65 | } |
86 | 66 |
|
87 | 67 | std::optional<std::string> MockProcessSession::getAttribute(api::core::FlowFile& ff, std::string_view key) { |
88 | | - auto& attributes = flow_file_datas.at(ff.get()).attributes; |
| 68 | + auto& attributes = ff->attributes; |
89 | 69 | if (const auto it = attributes.find(std::string(key)); it != attributes.end()) { return it->second; } |
90 | 70 | return std::nullopt; |
91 | 71 | } |
92 | 72 |
|
93 | 73 | std::map<std::string, std::string> MockProcessSession::getAttributes(const api::core::FlowFile& ff) const { |
94 | | - return flow_file_datas.at(ff.get()).attributes; |
| 74 | + return ff->attributes; |
95 | 75 | } |
96 | 76 |
|
97 | 77 | std::string MockProcessSession::getFlowFileId(const api::core::FlowFile& ff) const { |
98 | | - return flow_file_datas.at(ff.get()).id; |
| 78 | + return ff->id; |
99 | 79 | } |
100 | 80 |
|
101 | 81 | uint64_t MockProcessSession::getFlowFileSize(const api::core::FlowFile& ff) const { |
102 | | - return flow_file_datas.at(ff.get()).content.size(); |
| 82 | + return ff->content.size(); |
103 | 83 | } |
104 | 84 |
|
105 | | -void MockProcessSession::addInputFlowFile(MockFlowFileData flow_file_data) { |
106 | | - auto new_ff = api::core::FlowFile{new MinifiFlowFile}; // NOLINT(clang-analyzer-cplusplus.NewDeleteLeaks) |
107 | | - flow_file_datas.emplace(new_ff.get(), std::move(flow_file_data)); |
108 | | - input_flow_files_.push_back(std::move(new_ff)); |
| 85 | +void MockProcessSession::addInputFlowFile(std::unique_ptr<MinifiFlowFile> flow_file) { |
| 86 | + input_flow_files_.push_back(std::move(flow_file)); |
109 | 87 | } |
110 | 88 | } // namespace org::apache::nifi::minifi::mock |
0 commit comments