-
Notifications
You must be signed in to change notification settings - Fork 166
Expand file tree
/
Copy pathDataProcessorAdapter.h
More file actions
199 lines (165 loc) · 7.78 KB
/
Copy pathDataProcessorAdapter.h
File metadata and controls
199 lines (165 loc) · 7.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef QUALITYCONTROL_DATAPROCESSORADAPTER_H
#define QUALITYCONTROL_DATAPROCESSORADAPTER_H
///
/// \file DataProcessorAdapter.h
/// \author Piotr Konopka
///
#include <Framework/DataProcessorSpec.h>
#include "QualityControl/Actor.h"
#include "QualityControl/ActorTraits.h"
#include "QualityControl/ActorHelpers.h"
#include "QualityControl/UserInputOutput.h"
namespace o2::quality_control::core
{
// helpers for DataProcessorAdapter
namespace impl
{
/// \brief checks if a type is derived a single UserCodeConfig
template <typename T>
concept UserCodeConfigSingle =
std::derived_from<std::remove_cvref_t<T>, UserCodeConfig>;
/// \brief checks if a type is a range of UserCodeConfig children
template <typename R>
concept UserCodeConfigRange =
std::ranges::input_range<R> &&
std::derived_from<std::remove_cvref_t<std::ranges::range_value_t<R>>, UserCodeConfig>;
/// \brief converts scalars into ranges of length 1, preserves ranges
template <class T>
auto as_range(T&& t)
{
using U = std::remove_reference_t<T>;
if constexpr (std::ranges::range<U>) {
// Already a range, just wrap in a view for consistency
return std::views::all(std::forward<T>(t));
} else {
// a scalar, we wrap it into a single-element range
return std::views::single(std::forward<T>(t));
}
}
} // namespace impl
struct DataProcessorAdapter {
/// \brief creates a DataProcessorSpec for a concrete actor
template <typename ConcreteActor>
static o2::framework::DataProcessorSpec
adapt(ConcreteActor&& actor, std::string&& dataProcessorName, framework::Inputs&& inputs, framework::Outputs&& outputs, framework::Options&& options)
{
using traits = ActorTraits<ConcreteActor>;
auto actorPtr = std::make_shared<ConcreteActor>(std::move(actor));
o2::framework::DataProcessorSpec dataProcessor;
dataProcessor.name = std::move(dataProcessorName);
dataProcessor.inputs = std::move(inputs);
dataProcessor.outputs = std::move(outputs);
dataProcessor.options = std::move(options);
dataProcessor.labels = { dataProcessorLabel<ConcreteActor>() };
if constexpr (traits::sCriticality == Criticality::Resilient) {
dataProcessor.labels.emplace_back("resilient");
} else if constexpr (traits::sCriticality == Criticality::Critical) {
// that's the default in DPL
} else if constexpr (traits::sCriticality == Criticality::Expendable) {
dataProcessor.labels.emplace_back("expendable");
} else if constexpr (traits::sCriticality == Criticality::UserDefined) {
if (!actor.isCritical()) {
dataProcessor.labels.emplace_back("expendable");
} else {
// we make it resilient so we can support upstream data processors with are either expendable and critical,
// and hide the unnecessary complexity from the user.
dataProcessor.labels.emplace_back("resilient");
}
}
dataProcessor.algorithm = {
[actorPtr](framework::InitContext& ictx) {
actorPtr->init(ictx);
return [actorPtr](framework::ProcessingContext& ctx) {
actorPtr->process(ctx);
};
}
};
return dataProcessor;
}
/// \brief Produces a standard QC Data Processor name for cases when it runs user code and is associated with a detector.
static std::string dataProcessorName(std::string_view userCodeName, std::string_view detectorName, std::string_view actorTypeKebabCase);
/// \brief Produces a standard QC Data Processor name for cases when it runs user code and is associated with a detector.
template <typename ConcreteActor>
requires(actor_helpers::runsUserCode<ConcreteActor>() && ActorTraits<ConcreteActor>::sDetectorSpecific)
static std::string dataProcessorName(std::string_view userCodeName, std::string_view detectorName)
{
using traits = ActorTraits<ConcreteActor>;
return dataProcessorName(userCodeName, detectorName, traits::sActorTypeKebabCase);
}
/// \brief Produces standardized QC Data Processor name for cases were no user code is ran and it's not detector specific.
template <typename ConcreteActor>
requires(!actor_helpers::runsUserCode<ConcreteActor>() || !ActorTraits<ConcreteActor>::sDetectorSpecific)
static std::string dataProcessorName()
{
using traits = ActorTraits<ConcreteActor>;
return std::string{ traits::sActorTypeKebabCase };
}
/// \brief collects all user inputs in the provided UserCodeConfig(s) and returns framework::Inputs
template <typename ConcreteActor, typename ConfigT>
requires(impl::UserCodeConfigSingle<ConfigT> || impl::UserCodeConfigRange<ConfigT>)
static framework::Inputs collectUserInputs(ConfigT&& config)
{
using traits = ActorTraits<ConcreteActor>;
// normalize to a range, even if it's a single config
auto configRange = impl::as_range(std::forward<ConfigT>(config));
// get a view over all data sources
auto dataSources = configRange //
| std::views::transform([](const UserCodeConfig& config) -> const auto& {
return config.dataSources;
}) |
std::views::join;
// validate
auto firstInvalid = std::ranges::find_if(dataSources, [](const DataSourceSpec& dataSource) {
return std::ranges::none_of(traits::sConsumedDataSources, [&](const DataSourceType& allowed) {
return dataSource.type == allowed;
});
});
if (firstInvalid != dataSources.end()) {
throw std::invalid_argument(
std::format("DataSource '{}' is not one of supported types for '{}'", firstInvalid->id, traits::sActorTypeUpperCamelCase));
}
// copy into the results
framework::Inputs inputs{};
std::ranges::copy(dataSources //
| std::views::transform([](const auto& ds) -> const auto& { return ds.inputs; }) //
| std::views::join,
std::back_inserter(inputs));
// fixme: CheckRunner might have overlapping or repeating inputs. we should handle that here.
// There is some existing code in DataSampling which already does that, it could be copied here.
return inputs;
}
/// \brief collects all user outputs in the provided UserCodeConfig(s) and returns framework::Outputs
template <typename ConcreteActor, DataSourceType dataSourceType, typename ConfigT>
requires(impl::UserCodeConfigSingle<ConfigT> || impl::UserCodeConfigRange<ConfigT>)
static framework::Outputs collectUserOutputs(ConfigT&& config)
{
using traits = ActorTraits<ConcreteActor>;
// normalize to a range, even if it's a single config
auto configRange = impl::as_range(std::forward<ConfigT>(config));
framework::Outputs outputs{};
std::ranges::copy(configRange //
| std::views::transform([](const UserCodeConfig& config) {
return createUserOutputSpec(dataSourceType, config.detectorName, config.name);
}),
std::back_inserter(outputs));
return outputs;
}
template <typename ConcreteActor>
static framework::DataProcessorLabel dataProcessorLabel()
{
using traits = ActorTraits<ConcreteActor>;
return { std::string{ traits::sActorTypeKebabCase } };
}
};
}; // namespace o2::quality_control::core
#endif // QUALITYCONTROL_DATAPROCESSORADAPTER_H