Skip to content

Commit 7012fa3

Browse files
authored
[QC-489] Make sure aggregators are run in the right order (#560)
1 parent 06226e6 commit 7012fa3

7 files changed

Lines changed: 238 additions & 16 deletions

File tree

Framework/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ endforeach()
304304

305305
foreach(t testTaskInterface testWorkflow testTaskRunner testCheckWorkflow
306306
testInfrastructureGenerator testPostProcessingConfig testPostProcessingInterface
307-
testPostProcessingRunner testCheck testCheckRunner testTrendingTask)
307+
testPostProcessingRunner testCheck testCheckRunner testTrendingTask testAggregatorRunner)
308308
target_sources(${t} PRIVATE
309309
${CMAKE_BINARY_DIR}/getTestDataDirectory.cxx)
310310
target_include_directories(${t} PRIVATE ${CMAKE_SOURCE_DIR})

Framework/include/QualityControl/Aggregator.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,13 @@ namespace o2::quality_control::checker
3333
{
3434

3535
class AggregatorInterface;
36+
enum AggregatorSourceType { check,
37+
aggregator };
38+
struct AggregatorSource {
39+
AggregatorSource(const std::string& t, const std::string& n);
40+
AggregatorSourceType type;
41+
std::string name;
42+
};
3643

3744
/// \brief An aggregator as found in the configuration.
3845
///
@@ -64,10 +71,13 @@ class Aggregator
6471
std::string getPolicyName() const;
6572
std::vector<std::string> getObjectsNames() const;
6673
bool getAllObjectsOption() const;
74+
std::vector<AggregatorSource> getSources();
75+
std::vector<AggregatorSource> getSources(AggregatorSourceType type);
6776

6877
private:
6978
CheckConfig mAggregatorConfig; // we reuse checkConfig, just consider that Check = Aggregator
7079
AggregatorInterface* mAggregatorInterface = nullptr;
80+
std::vector<AggregatorSource> mSources;
7181
};
7282

7383
} // namespace o2::quality_control::checker

Framework/include/QualityControl/AggregatorRunner.h

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ class TClass;
6666

6767
namespace o2::quality_control::checker
6868
{
69+
struct AggregatorSource;
6970

7071
/// \brief The class in charge of running the aggregators on the QualityObjects.
7172
///
@@ -99,8 +100,9 @@ class AggregatorRunner : public framework::Task
99100
/// \brief AggregatorRunner process callback
100101
void run(framework::ProcessingContext& ctx) override;
101102

102-
framework::Inputs getInputs() { return mInputs; };
103-
std::string getDeviceName() { return mDeviceName; };
103+
framework::Inputs getInputs() { return mInputs; }
104+
std::string getDeviceName() { return mDeviceName; }
105+
const std::vector<std::shared_ptr<Aggregator>>& getAggregators() const { return mAggregators; }
104106

105107
static std::string createAggregatorRunnerIdString() { return "QC-AGGREGATOR-RUNNER"; };
106108
static std::string createAggregatorRunnerName();
@@ -128,14 +130,29 @@ class AggregatorRunner : public framework::Task
128130
inline void initServiceDiscovery();
129131
inline void initAggregators();
130132

133+
/**
134+
* Reorder the aggregators stored in mAggregators.
135+
*/
136+
void reorderAggregators();
137+
138+
/**
139+
* Checks whether all sources provided are already in the aggregators vector.
140+
* The match is done by name.
141+
* @param sources
142+
* @param aggregators
143+
* @return true if all sources are found, by name, in the vector of aggregators.
144+
*/
145+
bool areSourcesIn(const std::vector<AggregatorSource>& sources,
146+
const std::vector<std::shared_ptr<Aggregator>>& aggregators);
147+
131148
/**
132149
* Send metrics to the monitoring system if the time has come.
133150
*/
134151
void sendPeriodicMonitoring();
135152

136153
// General state
137154
std::string mDeviceName;
138-
std::map<std::string, std::shared_ptr<Aggregator>> mAggregatorsMap;
155+
std::vector<std::shared_ptr<Aggregator>> mAggregators;
139156
std::shared_ptr<o2::quality_control::repository::DatabaseInterface> mDatabase;
140157
std::shared_ptr<o2::configuration::ConfigurationInterface> mConfigFile;
141158
core::QualityObjectsMapType mQualityObjects; // where we cache the incoming quality objects and the output of the aggregators

Framework/src/Aggregator.cxx

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717
#include "QualityControl/Aggregator.h"
1818
#include "QualityControl/RootClassFactory.h"
1919
#include "QualityControl/AggregatorInterface.h"
20+
#include <Common/Exceptions.h>
2021

2122
using namespace o2::quality_control::checker;
2223
using namespace o2::quality_control::core;
2324
using namespace std;
2425

26+
namespace o2::quality_control::checker
27+
{
28+
2529
Aggregator::Aggregator(const std::string& aggregatorName, const boost::property_tree::ptree& configuration)
2630
{
2731
mAggregatorConfig.name = aggregatorName;
@@ -44,10 +48,7 @@ Aggregator::Aggregator(const std::string& aggregatorName, const boost::property_
4448
if (auto sourceType = dataSource.get<std::string>("type"); sourceType == "Aggregator" || sourceType == "Check") {
4549
auto sourceName = dataSource.get<std::string>("name");
4650
ILOG(Info, Devel) << " Found a source : " << sourceName << ENDM;
47-
48-
cout << "dataSource.count(QOs) : " << dataSource.count("QOs") << endl;
49-
if (dataSource.count("QOs") != 0)
50-
cout << "dataSource.get<std::string>(\"QOs\") : " << dataSource.get<std::string>("QOs") << endl;
51+
mSources.emplace_back(sourceType, sourceName);
5152

5253
if (dataSource.count("QOs") == 0) {
5354
ILOG(Info, Devel) << " (no QOs specified, we take all)" << ENDM;
@@ -123,4 +124,32 @@ std::vector<std::string> Aggregator::getObjectsNames() const
123124
bool Aggregator::getAllObjectsOption() const
124125
{
125126
return mAggregatorConfig.allObjects;
127+
}
128+
129+
std::vector<AggregatorSource> Aggregator::getSources()
130+
{
131+
return mSources;
132+
}
133+
134+
std::vector<AggregatorSource> Aggregator::getSources(AggregatorSourceType type)
135+
{
136+
std::vector<AggregatorSource> matches;
137+
std::copy_if(mSources.begin(), mSources.end(), std::back_inserter(matches), [&](const AggregatorSource& source) {
138+
return source.type == type;
139+
});
140+
return matches;
141+
}
142+
143+
AggregatorSource::AggregatorSource(const std::string& t, const std::string& n)
144+
{
145+
if (t == "Aggregator") {
146+
type = aggregator;
147+
} else if (t == "Check") {
148+
type = check;
149+
} else {
150+
BOOST_THROW_EXCEPTION(AliceO2::Common::Exception() << AliceO2::Common::errinfo_details("Unknown type of Aggregator: " + t));
151+
}
152+
name = n;
153+
}
154+
126155
}

Framework/src/AggregatorRunner.cxx

Lines changed: 84 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ QualityObjectsType AggregatorRunner::aggregate()
127127
ILOG(Debug, Trace) << "Aggregate called in AggregatorRunner, QOs in cache: " << mQualityObjects.size() << ENDM;
128128

129129
QualityObjectsType allQOs;
130-
for (auto const& [aggregatorName, aggregator] : mAggregatorsMap) {
130+
for (auto const& aggregator : mAggregators) {
131+
string aggregatorName = aggregator->getName();
131132
ILOG(Info, Devel) << "Processing aggregator: " << aggregatorName << ENDM;
132133

133134
if (updatePolicyManager.isReady(aggregatorName)) {
@@ -200,17 +201,93 @@ void AggregatorRunner::initAggregators()
200201

201202
// For every aggregator definition, create an Aggregator
202203
for (const auto& [aggregatorName, aggregatorConfig] : mConfigFile->getRecursive("qc.aggregators")) {
203-
204204
ILOG(Info, Devel) << ">> Aggregator name : " << aggregatorName << ENDM;
205+
205206
if (aggregatorConfig.get<bool>("active", true)) {
206-
// create Aggregator and store it.
207-
auto aggregator = make_shared<Aggregator>(aggregatorName, aggregatorConfig);
208-
aggregator->init();
209-
updatePolicyManager.addPolicy(aggregator->getName(), aggregator->getPolicyName(), aggregator->getObjectsNames(), aggregator->getAllObjectsOption(), false);
210-
mAggregatorsMap[aggregatorName] = aggregator;
207+
try {
208+
auto aggregator = make_shared<Aggregator>(aggregatorName, aggregatorConfig);
209+
aggregator->init();
210+
updatePolicyManager.addPolicy(aggregator->getName(),
211+
aggregator->getPolicyName(),
212+
aggregator->getObjectsNames(),
213+
aggregator->getAllObjectsOption(),
214+
false);
215+
mAggregators.push_back(aggregator);
216+
} catch (...) {
217+
// catch the configuration exception and print it to avoid losing it
218+
ILOG(Error, Ops) << "Error creating aggregator '" << aggregatorName << "'"
219+
<< current_diagnostic(true) << ENDM;
220+
continue; // skip this aggregator, it might still fail fatally later if another aggregator depended on it
221+
}
211222
}
212223
}
213224
}
225+
226+
reorderAggregators();
227+
}
228+
229+
bool AggregatorRunner::areSourcesIn(const std::vector<AggregatorSource>& sources,
230+
const std::vector<std::shared_ptr<Aggregator>>& aggregators)
231+
{
232+
for (auto source : sources) {
233+
auto it = find_if(aggregators.begin(), aggregators.end(),
234+
[&](const std::shared_ptr<Aggregator>& agg) { return (agg->getName() == source.name); });
235+
if (it == aggregators.end()) {
236+
return false;
237+
}
238+
}
239+
240+
return true;
241+
}
242+
243+
void AggregatorRunner::reorderAggregators()
244+
{
245+
// Implementation
246+
// This is a simple, light-weight, but sub-optimal implementation.
247+
// One could build a proper tree (e.g. with boost Graph) and then apply a complex algorithm to order
248+
// the nodes and find cycles.
249+
// Instead this implementation goes through the aggregators and for each checks whether
250+
// there are no dependencies or if they are all fulfilled. If it is the case the aggregator
251+
// is moved at the end of the resulting vector.
252+
// In case we have looped through all the remaining aggregators and nothing has been done,
253+
// ie. no aggregator has its dependencies fulfilled, we stop and raise an error.
254+
// This means that there is a cycle or that one of the dependencies does not exist.
255+
// Note that by "fulfilled" we mean that all the sources of an aggregator are already
256+
// in the result vector.
257+
258+
std::vector<std::shared_ptr<Aggregator>> originals = mAggregators;
259+
std::vector<std::shared_ptr<Aggregator>> results;
260+
bool modificationLastIteration = true;
261+
// As long as there are items in original and we did some modifications in the last iteration
262+
while (!originals.empty() && modificationLastIteration) {
263+
modificationLastIteration = false;
264+
std::vector<std::shared_ptr<Aggregator>> toBeMoved; // we need it because we cannot modify the vectors while iterating over them
265+
// Loop over remaining items in the original list
266+
for (const auto& orig : originals) {
267+
// if no Aggregator dependencies or Aggregator sources are all already in result
268+
auto sources = orig->getSources(aggregator);
269+
if (sources.empty() || areSourcesIn(sources, results)) {
270+
// move from original to result
271+
toBeMoved.push_back(orig);
272+
modificationLastIteration = true;
273+
}
274+
}
275+
// move the items from one vector to the other
276+
for (const auto& item : toBeMoved) {
277+
results.push_back(item);
278+
originals.erase(std::remove(originals.begin(), originals.end(), item), originals.end());
279+
}
280+
}
281+
282+
if (!originals.empty()) {
283+
string msg =
284+
"Error in the aggregators definition : either there is a cycle "
285+
"or an aggregator depends on an aggregator that does not exist.";
286+
ILOG(Error, Ops) << msg << ENDM;
287+
BOOST_THROW_EXCEPTION(FatalException() << errinfo_details(msg));
288+
}
289+
assert(results.size() != mAggregators.size());
290+
mAggregators = results;
214291
}
215292

216293
void AggregatorRunner::sendPeriodicMonitoring()

Framework/test/testAggregatorRunner.cxx

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,13 @@
1313
/// \author Barthelemy von Haller
1414
///
1515

16+
#include "getTestDataDirectory.h"
1617
#include "QualityControl/AggregatorRunnerFactory.h"
1718
#include "QualityControl/AggregatorRunner.h"
19+
#include "QualityControl/Aggregator.h"
20+
#include <Framework/InitContext.h>
21+
#include <Framework/ConfigParamRegistry.h>
22+
#include <Framework/ConfigParamStore.h>
1823

1924
#define BOOST_TEST_MODULE CheckRunner test
2025
#define BOOST_TEST_MAIN
@@ -27,9 +32,30 @@ using namespace std;
2732
using namespace o2::framework;
2833
using namespace o2::header;
2934

30-
BOOST_AUTO_TEST_CASE(test_check_runner_static)
35+
BOOST_AUTO_TEST_CASE(test_aggregator_runner_static)
3136
{
3237
BOOST_CHECK(AggregatorRunner::createAggregatorRunnerDataDescription("qwertyuiop") == DataDescription("qwertyuiop"));
3338
BOOST_CHECK(AggregatorRunner::createAggregatorRunnerDataDescription("012345678901234567890") == DataDescription("0123456789012345"));
3439
BOOST_CHECK_THROW(AggregatorRunner::createAggregatorRunnerDataDescription(""), AliceO2::Common::FatalException);
3540
}
41+
42+
BOOST_AUTO_TEST_CASE(test_aggregator_runner)
43+
{
44+
std::string configFilePath = std::string("json://") + getTestDataDirectory() + "testSharedConfig.json";
45+
AggregatorRunner aggregatorRunner{ configFilePath, { OutputSpec{ { "mo" }, "QC", "abcTask-mo", 123 } } };
46+
47+
std::unique_ptr<ConfigParamStore> store;
48+
ConfigParamRegistry cfReg(std::move(store));
49+
ServiceRegistry sReg;
50+
InitContext initContext{ cfReg, sReg };
51+
aggregatorRunner.init(initContext);
52+
53+
BOOST_CHECK_EQUAL(aggregatorRunner.getDeviceName(), "QC-AGGREGATOR-RUNNER");
54+
55+
// check the reordering
56+
const std::vector<std::shared_ptr<Aggregator>> aggregators = aggregatorRunner.getAggregators();
57+
BOOST_CHECK(aggregators.at(0)->getName() == "MyAggregatorC" || aggregators.at(0)->getName() == "MyAggregatorB");
58+
BOOST_CHECK(aggregators.at(1)->getName() == "MyAggregatorC" || aggregators.at(1)->getName() == "MyAggregatorB");
59+
BOOST_CHECK(aggregators.at(2)->getName() == "MyAggregatorA");
60+
BOOST_CHECK(aggregators.at(3)->getName() == "MyAggregatorD");
61+
}

Framework/test/testSharedConfig.json

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,69 @@
181181
"once"
182182
]
183183
}
184+
},
185+
"aggregators": {
186+
"MyAggregatorA": {
187+
"active": "true",
188+
"className": "o2::quality_control_modules::skeleton::SkeletonAggregator",
189+
"moduleName": "QcSkeleton",
190+
"policy": "OnAll",
191+
"detectorName": "TST",
192+
"dataSource": [{
193+
"type": "Aggregator",
194+
"name": "MyAggregatorB", "": "no QOs parameter -> all QOs of this aggregator",
195+
"QOs": ["newQuality"], "": "list all objects we are interested in"
196+
}, {
197+
"type": "Aggregator",
198+
"name": "MyAggregatorC",
199+
"QOs": ["newQuality", "another"], "": "if we omitted the QOs for a data source we would default to OnAny"
200+
}]
201+
},
202+
"MyAggregatorD": {
203+
"active": "true",
204+
"className": "o2::quality_control_modules::skeleton::SkeletonAggregator",
205+
"moduleName": "QcSkeleton",
206+
"policy": "OnAll",
207+
"detectorName": "TST",
208+
"dataSource": [{
209+
"type": "Aggregator",
210+
"name": "MyAggregatorA", "": "no QOs parameter -> all QOs of this aggregator",
211+
"QOs": ["newQuality"], "": "list all objects we are interested in"
212+
}, {
213+
"type": "Aggregator",
214+
"name": "MyAggregatorC",
215+
"QOs": ["newQuality", "another"], "": "if we omitted the QOs for a data source we would default to OnAny"
216+
}]
217+
},
218+
"MyAggregatorC": {
219+
"active": "true",
220+
"className": "o2::quality_control_modules::skeleton::SkeletonAggregator",
221+
"moduleName": "QcSkeleton",
222+
"policy": "OnAll",
223+
"detectorName": "TST",
224+
"dataSource": [{
225+
"type": "Check",
226+
"name": "dataSizeCheck"
227+
}, {
228+
"type": "Check",
229+
"name": "someNumbersCheck"
230+
}]
231+
},
232+
"MyAggregatorB": {
233+
"active": "true",
234+
"className": "o2::quality_control_modules::skeleton::SkeletonAggregator",
235+
"moduleName": "QcSkeleton",
236+
"policy": "OnAll",
237+
"detectorName": "TST",
238+
"dataSource": [{
239+
"type": "Check",
240+
"name": "dataSizeCheck"
241+
}, {
242+
"type": "Check", "": "this one is using onEachSeparately and thus it sends under a full path",
243+
"name": "someNumbersCheck",
244+
"QOs": ["someNumbersTask/example"], "": "also possible to ignore it altogether, meaning we take all objects"
245+
}]
246+
}
184247
}
185248
},
186249
"dataSamplingPolicies": [

0 commit comments

Comments
 (0)