Skip to content

Commit 1b9206b

Browse files
authored
Revert "[QC-858] Single CheckRunner per workflow (#2028)" (#2059)
This reverts commit 488c547.
1 parent 5b94c33 commit 1b9206b

9 files changed

Lines changed: 258 additions & 56 deletions

File tree

Framework/include/QualityControl/CheckConfig.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ struct CheckConfig {
3838
UpdatePolicyType policyType = UpdatePolicyType::OnAny;
3939
std::vector<std::string> objectNames{}; // fixme: if object names are empty, allObjects are true, consider reducing to one var
4040
bool allObjects = false;
41+
bool allowBeautify = false;
4142
framework::Inputs inputSpecs{};
4243
framework::OutputSpec qoSpec{ "XXX", "INVALID" };
4344
std::string conditionUrl{};

Framework/include/QualityControl/CheckRunner.h

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "QualityControl/CheckRunnerConfig.h"
3535
#include "QualityControl/Check.h"
3636
#include "QualityControl/MonitorObject.h"
37+
#include "QualityControl/QualityObject.h"
3738
#include "QualityControl/UpdatePolicyManager.h"
3839

3940
namespace o2::quality_control::core
@@ -78,6 +79,7 @@ namespace o2::quality_control::checker
7879
class CheckRunner : public framework::Task
7980
{
8081
public:
82+
/// Constructor
8183
/**
8284
* \brief CheckRunner constructor
8385
*
@@ -86,9 +88,20 @@ class CheckRunner : public framework::Task
8688
* Group check assumes that the input of the checks is the same!
8789
*
8890
* @param checkRunnerConfig configuration of CheckRunner
91+
* @param checkConfigs configuration of all Checks that should run in this data processor
92+
*/
93+
CheckRunner(CheckRunnerConfig, const std::vector<CheckConfig>& checkConfigs);
94+
95+
/**
96+
* \brief CheckRunner constructor
97+
*
98+
* Create a sink for the Input. It is expected to receive Monitor Object to store.
99+
* It will not run any checks on a given input.
100+
*
101+
* @param checkRunnerConfig configuration of CheckRunner
89102
* @param input Monitor Object input spec.
90103
*/
91-
CheckRunner(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs, o2::framework::Inputs inputs);
104+
CheckRunner(CheckRunnerConfig, o2::framework::InputSpec input);
92105

93106
/// Destructor
94107
~CheckRunner() override;
@@ -105,16 +118,19 @@ class CheckRunner : public framework::Task
105118
framework::Inputs getInputs() { return mInputs; };
106119
framework::Outputs getOutputs() { return mOutputs; };
107120

121+
void setTaskStoreSet(std::unordered_set<std::string> storeSet) { mInputStoreSet = storeSet; }
108122
std::string getDeviceName() { return mDeviceName; };
109123

110124
static framework::DataProcessorLabel getCheckRunnerLabel() { return { "qc-check" }; }
111125
static std::string createCheckRunnerIdString() { return "qc-check"; };
112-
static std::string createCheckRunnerName();
126+
static std::string createCheckRunnerName(const std::vector<CheckConfig>& checks);
127+
static std::string createSinkCheckRunnerName(o2::framework::InputSpec input);
128+
static std::string createCheckRunnerFacility(std::string deviceName);
113129

114130
/// \brief Compute the detector name to be used for this checkrunner.
115131
/// Compute the detector name to be used for this checkrunner.
116132
/// If all checks belong to the same detector we use it, otherwise we use "MANY"
117-
static std::string getDetectorName(const std::vector<CheckConfig>& checks);
133+
static std::string getDetectorName(const std::vector<CheckConfig> checks);
118134

119135
private:
120136
/**
@@ -205,7 +221,8 @@ class CheckRunner : public framework::Task
205221
std::shared_ptr<Activity> mActivity; // shareable with the Checks
206222
CheckRunnerConfig mConfig;
207223
std::shared_ptr<o2::quality_control::repository::DatabaseInterface> mDatabase;
208-
std::vector<std::shared_ptr<MonitorObject>> mToBeStored;
224+
std::unordered_set<std::string> mInputStoreSet;
225+
std::vector<std::shared_ptr<MonitorObject>> mMonitorObjectStoreVector;
209226
UpdatePolicyManager updatePolicyManager;
210227
bool mReceivedEOS = false;
211228

Framework/include/QualityControl/CheckRunnerFactory.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,17 @@ class CheckRunnerFactory
4343
CheckRunnerFactory() = default;
4444
virtual ~CheckRunnerFactory() = default;
4545

46-
static framework::DataProcessorSpec create(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs);
46+
static framework::DataProcessorSpec create(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs, std::vector<std::string> storeVector = {});
47+
48+
/*
49+
* \brief Create a CheckRunner sink DPL device.
50+
*
51+
* The purpose of this device is to receive and store the MO from task.
52+
*
53+
* @param input InputSpec with the content to store
54+
* @param configurationSource
55+
*/
56+
static framework::DataProcessorSpec createSinkDevice(const CheckRunnerConfig& checkRunnerConfig, const o2::framework::InputSpec& input);
4757

4858
static CheckRunnerConfig extractConfig(const core::CommonSpec&);
4959

Framework/src/Check.cxx

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,10 @@ QualityObjectsType Check::check(std::map<std::string, std::shared_ptr<MonitorObj
172172

173173
void Check::beautify(std::map<std::string, std::shared_ptr<MonitorObject>>& moMap, const Quality& quality)
174174
{
175+
if (!mCheckConfig.allowBeautify) {
176+
return;
177+
}
178+
175179
for (auto const& item : moMap) {
176180
try {
177181
mCheckInterface->beautify(item.second /*mo*/, quality);
@@ -232,6 +236,12 @@ CheckConfig Check::extractConfig(const CommonSpec& commonSpec, const CheckSpec&
232236
}
233237
}
234238

239+
bool allowBeautify = checkSpec.dataSources.size() <= 1;
240+
if (!allowBeautify) {
241+
// See QC-299 for details
242+
ILOG(Warning, Devel) << "Beautification disabled because more than one source is used in this Check (" << checkSpec.checkName << ")" << ENDM;
243+
}
244+
235245
return {
236246
checkSpec.checkName,
237247
checkSpec.moduleName,
@@ -241,6 +251,7 @@ CheckConfig Check::extractConfig(const CommonSpec& commonSpec, const CheckSpec&
241251
updatePolicy,
242252
std::move(objectNames),
243253
checkAllObjects,
254+
allowBeautify,
244255
std::move(inputs),
245256
createOutputSpec(checkSpec.checkName),
246257
commonSpec.conditionDBUrl

Framework/src/CheckRunner.cxx

Lines changed: 76 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,56 @@ std::size_t CheckRunner::hash(const std::string& inputString)
6969
return checksum;
7070
}
7171

72-
std::string CheckRunner::createCheckRunnerName()
72+
std::string CheckRunner::createCheckRunnerName(const std::vector<CheckConfig>& checks)
7373
{
74-
return CheckRunner::createCheckRunnerIdString();
74+
static const std::string alphanumeric =
75+
"0123456789"
76+
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
77+
"abcdefghijklmnopqrstuvwxyz";
78+
const int NAME_LEN = 4;
79+
std::string name(CheckRunner::createCheckRunnerIdString() + "-" + getDetectorName(checks) + "-");
80+
81+
if (checks.size() == 1) {
82+
// If single check, use the check name
83+
name += checks[0].name;
84+
} else {
85+
std::string hash_string;
86+
std::vector<std::string> names;
87+
// Fill vector with check names
88+
for (const auto& c : checks) {
89+
names.push_back(c.name);
90+
}
91+
// Be sure that after configuration shuffle, the name will be the same
92+
std::sort(names.begin(), names.end());
93+
94+
// Create a single string and hash it
95+
for (auto& n : names) {
96+
hash_string += n;
97+
}
98+
std::size_t num = hash(hash_string);
99+
100+
// Change numerical to alphanumeric hash representation
101+
for (int i = 0; i < NAME_LEN; ++i) {
102+
name += alphanumeric[num % alphanumeric.size()];
103+
num = num / alphanumeric.size();
104+
}
105+
}
106+
return name;
107+
}
108+
109+
std::string CheckRunner::createCheckRunnerFacility(std::string deviceName)
110+
{
111+
// it starts with "check/" and is followed by the unique part of the device name truncated to a maximum of 32 characters.
112+
string facilityName = "check/" + deviceName.substr(CheckRunner::createCheckRunnerIdString().length() + 1, string::npos);
113+
facilityName = facilityName.substr(0, 32);
114+
return facilityName;
115+
}
116+
117+
std::string CheckRunner::createSinkCheckRunnerName(InputSpec input)
118+
{
119+
std::string name(CheckRunner::createCheckRunnerIdString() + "-sink-");
120+
name += DataSpecUtils::label(input);
121+
return name;
75122
}
76123

77124
o2::framework::Outputs CheckRunner::collectOutputs(const std::vector<CheckConfig>& checkConfigs)
@@ -83,12 +130,13 @@ o2::framework::Outputs CheckRunner::collectOutputs(const std::vector<CheckConfig
83130
return outputs;
84131
}
85132

86-
CheckRunner::CheckRunner(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs, o2::framework::Inputs inputs)
133+
CheckRunner::CheckRunner(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs)
87134
: mDetectorName(getDetectorName(checkConfigs)),
88-
mDeviceName(createCheckRunnerName()),
135+
mDeviceName(createCheckRunnerName(checkConfigs)),
89136
mConfig(std::move(checkRunnerConfig)),
90-
mInputs{ inputs },
91-
mOutputs{ CheckRunner::collectOutputs(checkConfigs) },
137+
/* All checks have the same Input */
138+
mInputs(checkConfigs.front().inputSpecs),
139+
mOutputs(CheckRunner::collectOutputs(checkConfigs)),
92140
mTotalNumberObjectsReceived(0),
93141
mTotalNumberCheckExecuted(0),
94142
mTotalNumberQOStored(0),
@@ -100,6 +148,19 @@ CheckRunner::CheckRunner(CheckRunnerConfig checkRunnerConfig, const std::vector<
100148
}
101149
}
102150

151+
CheckRunner::CheckRunner(CheckRunnerConfig checkRunnerConfig, InputSpec input)
152+
: mDeviceName(createSinkCheckRunnerName(input)),
153+
mConfig(std::move(checkRunnerConfig)),
154+
mInputs{ input },
155+
mOutputs{},
156+
mTotalNumberObjectsReceived(0),
157+
mTotalNumberCheckExecuted(0),
158+
mTotalNumberQOStored(0),
159+
mTotalNumberMOStored(0),
160+
mTotalQOSent(0)
161+
{
162+
}
163+
103164
CheckRunner::~CheckRunner()
104165
{
105166
ILOG(Debug, Trace) << "CheckRunner destructor (" << this << ")" << ENDM;
@@ -151,7 +212,7 @@ void CheckRunner::refreshConfig(InitContext& iCtx)
151212
void CheckRunner::init(framework::InitContext& iCtx)
152213
{
153214
try {
154-
core::initInfologger(iCtx, mConfig.infologgerDiscardParameters, mDeviceName);
215+
core::initInfologger(iCtx, mConfig.infologgerDiscardParameters, createCheckRunnerFacility(mDeviceName));
155216
refreshConfig(iCtx);
156217
Bookkeeping::getInstance().init(mConfig.bookkeepingUrl);
157218
initDatabase();
@@ -197,7 +258,7 @@ void CheckRunner::run(framework::ProcessingContext& ctx)
197258

198259
auto now = getCurrentTimestamp();
199260
store(qualityObjects, now);
200-
store(mToBeStored, now);
261+
store(mMonitorObjectStoreVector, now);
201262

202263
send(qualityObjects, ctx.outputs());
203264

@@ -209,7 +270,7 @@ void CheckRunner::run(framework::ProcessingContext& ctx)
209270

210271
void CheckRunner::prepareCacheData(framework::InputRecord& inputRecord)
211272
{
212-
mToBeStored.clear();
273+
mMonitorObjectStoreVector.clear();
213274

214275
for (const auto& input : mInputs) {
215276
auto dataRef = inputRecord.get(input.binding.c_str());
@@ -239,6 +300,7 @@ void CheckRunner::prepareCacheData(framework::InputRecord& inputRecord)
239300

240301
// for each item of the array, check whether it is a MonitorObject. If not, create one and encapsulate.
241302
// Then, store the MonitorObject in the various maps and vectors we will use later.
303+
bool store = mInputStoreSet.count(DataSpecUtils::label(input)) > 0; // Check if this CheckRunner stores this input
242304
for (const auto tObject : *array) {
243305
std::shared_ptr<MonitorObject> mo{ dynamic_cast<MonitorObject*>(tObject) };
244306

@@ -256,8 +318,9 @@ void CheckRunner::prepareCacheData(framework::InputRecord& inputRecord)
256318
updatePolicyManager.updateObjectRevision(mo->getFullName());
257319
mTotalNumberObjectsReceived++;
258320

259-
// Monitor Object will be stored later, after possible beautification
260-
mToBeStored.push_back(mo);
321+
if (store) { // Monitor Object will be stored later, after possible beautification
322+
mMonitorObjectStoreVector.push_back(mo);
323+
}
261324
}
262325
}
263326
}
@@ -288,7 +351,7 @@ void CheckRunner::sendPeriodicMonitoring()
288351

289352
QualityObjectsType CheckRunner::check()
290353
{
291-
ILOG(Debug, Devel) << "check(): Trying " << mChecks.size() << " checks for " << mMonitorObjects.size() << " monitor objects"
354+
ILOG(Debug, Devel) << "Trying " << mChecks.size() << " checks for " << mMonitorObjects.size() << " monitor objects"
292355
<< ENDM;
293356

294357
QualityObjectsType allQOs;
@@ -491,7 +554,7 @@ void CheckRunner::reset()
491554
mTotalQOSent = 0;
492555
}
493556

494-
std::string CheckRunner::getDetectorName(const std::vector<CheckConfig>& checks)
557+
std::string CheckRunner::getDetectorName(const std::vector<CheckConfig> checks)
495558
{
496559
std::string detectorName;
497560
for (auto& check : checks) {

Framework/src/CheckRunnerFactory.cxx

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,34 +26,17 @@
2626
#include "QualityControl/CheckRunner.h"
2727
#include "QualityControl/CheckRunnerFactory.h"
2828
#include "QualityControl/CommonSpec.h"
29-
#include <set>
3029

3130
namespace o2::quality_control::checker
3231
{
3332

3433
using namespace o2::framework;
3534

36-
DataProcessorSpec CheckRunnerFactory::create(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs)
35+
DataProcessorSpec CheckRunnerFactory::create(CheckRunnerConfig checkRunnerConfig, const std::vector<CheckConfig>& checkConfigs, std::vector<std::string> storeVector)
3736
{
3837
auto options = checkRunnerConfig.options;
39-
40-
// concatenate all inputs
41-
o2::framework::Inputs allInputs;
42-
for (auto config : checkConfigs) {
43-
allInputs.insert(allInputs.end(), config.inputSpecs.begin(), config.inputSpecs.end());
44-
}
45-
46-
// We can end up with duplicated inputs that will later lead to circular dependencies on the checkRunner device.
47-
o2::framework::Inputs allInputsNoDups;
48-
std::set<std::string> alreadySeen;
49-
for (auto input : allInputs) {
50-
if (alreadySeen.count(input.binding) == 0) {
51-
allInputsNoDups.push_back(input);
52-
}
53-
alreadySeen.insert(input.binding);
54-
}
55-
56-
CheckRunner qcCheckRunner{ std::move(checkRunnerConfig), checkConfigs, allInputsNoDups };
38+
CheckRunner qcCheckRunner{ std::move(checkRunnerConfig), checkConfigs };
39+
qcCheckRunner.setTaskStoreSet({ storeVector.begin(), storeVector.end() });
5740

5841
DataProcessorSpec newCheckRunner{ qcCheckRunner.getDeviceName(),
5942
qcCheckRunner.getInputs(),
@@ -66,6 +49,22 @@ DataProcessorSpec CheckRunnerFactory::create(CheckRunnerConfig checkRunnerConfig
6649
return newCheckRunner;
6750
}
6851

52+
DataProcessorSpec CheckRunnerFactory::createSinkDevice(const CheckRunnerConfig& checkRunnerConfig, const o2::framework::InputSpec& input)
53+
{
54+
CheckRunner qcCheckRunner{ checkRunnerConfig, input };
55+
qcCheckRunner.setTaskStoreSet({ DataSpecUtils::label(input) });
56+
57+
DataProcessorSpec newCheckRunner{ qcCheckRunner.getDeviceName(),
58+
qcCheckRunner.getInputs(),
59+
Outputs{ qcCheckRunner.getOutputs() },
60+
adaptFromTask<CheckRunner>(std::move(qcCheckRunner)),
61+
checkRunnerConfig.options,
62+
{},
63+
{ o2::framework::ecs::qcReconfigurable } };
64+
65+
return newCheckRunner;
66+
}
67+
6968
void CheckRunnerFactory::customizeInfrastructure(std::vector<framework::CompletionPolicy>& policies)
7069
{
7170
auto matcher = [label = CheckRunner::getCheckRunnerLabel()](framework::DeviceSpec const& device) {

0 commit comments

Comments
 (0)