Skip to content

Commit d293bea

Browse files
authored
Ability to merge QC workflows into main processing (#180)
We use the feature of DPL which allows to merge two workflows by piping their executables, like so: o2-qc-run-producer | o2-qc-run-qc --config json://${QUALITYCONTROL_ROOT}/etc/basic.json It allows to attach QC workflow to any other, without the need of having them in the same project or depending on each other.
1 parent fbcb41d commit d293bea

13 files changed

Lines changed: 232 additions & 125 deletions

Framework/CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ add_root_dictionary(QualityControl
7979
set(EXE_SRCS
8080
src/runInformationService.cxx
8181
src/runInformationServiceDump.cxx
82+
src/runDataProducer.cxx
83+
src/runQC.cxx
8284
src/runBasic.cxx
8385
src/runAdvanced.cxx
8486
src/runReadout.cxx
@@ -89,6 +91,8 @@ set(EXE_SRCS
8991
set(EXE_NAMES
9092
o2-qc-info-service
9193
o2-qc-info-service-dump
94+
o2-qc-run-producer
95+
o2-qc-run-qc
9296
o2-qc-run-basic
9397
o2-qc-run-advanced
9498
o2-qc-run-readout
@@ -101,6 +105,8 @@ set(EXE_NAMES
101105
set(EXE_OLD_NAMES
102106
qcInfoService
103107
qcInfoServiceDump
108+
qcRunProducer
109+
qcRunQC
104110
qcRunBasic
105111
qcRunAdvanced
106112
qcRunReadout

Framework/basic-no-sampling.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
"maxNumberCycles": "-1",
2929
"dataSource": {
3030
"type": "direct",
31-
"query" : "its-rawdata:ITS/RAWDATA/0"
31+
"query" : "tst-rawdata:TST/RAWDATA/0"
3232
},
3333
"taskParameters": {
3434
"nothing": "rien"
@@ -40,4 +40,4 @@
4040
"dataSamplingPolicies": [
4141

4242
]
43-
}
43+
}

Framework/basic.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
"dataSource_comment": "The other type of dataSource is \"direct\", see basic-no-sampling.json.",
3030
"dataSource": {
3131
"type": "dataSamplingPolicy",
32-
"name": "its-raw"
32+
"name": "tst-raw"
3333
},
3434
"taskParameters": {
3535
"myOwnKey": "myOwnValue"
@@ -40,10 +40,10 @@
4040
},
4141
"dataSamplingPolicies": [
4242
{
43-
"id": "its-raw",
43+
"id": "tst-raw",
4444
"active": "true",
4545
"machines": [],
46-
"query" : "random:ITS/RAWDATA/0",
46+
"query" : "random:TST/RAWDATA/0",
4747
"samplingConditions": [
4848
{
4949
"condition": "random",
@@ -54,4 +54,4 @@
5454
"blocking": "false"
5555
}
5656
]
57-
}
57+
}

Framework/readout-no-sampling.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,4 @@
3636
},
3737
"dataSamplingPolicies": [
3838
]
39-
}
39+
}

Framework/src/InfrastructureGenerator.cxx

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,19 @@ WorkflowSpec InfrastructureGenerator::generateLocalInfrastructure(std::string co
4040
// ids are assigned to local tasks in order to distinguish monitor objects outputs from each other and be able to
4141
// merge them. If there is no need to merge (only one qc task), it gets subspec 0.
4242
// todo: use matcher for subspec when available in DPL
43-
size_t id = taskConfig.get_child("machines").size() > 1 ? 1 : 0;
44-
for (const auto& machine : taskConfig.get_child("machines")) {
45-
46-
if (machine.second.get<std::string>("") == host) {
47-
// todo: optimize it by using the same ptree?
48-
workflow.emplace_back(taskRunnerFactory.create(taskName, configurationSource, id, true));
49-
break;
43+
if (host.empty()) {
44+
workflow.emplace_back(taskRunnerFactory.create(taskName, configurationSource, 0, false));
45+
} else {
46+
size_t id = taskConfig.get_child("machines").size() > 1 ? 1 : 0;
47+
for (const auto& machine : taskConfig.get_child("machines")) {
48+
49+
if (machine.second.get<std::string>("") == host) {
50+
// todo: optimize it by using the same ptree?
51+
workflow.emplace_back(taskRunnerFactory.create(taskName, configurationSource, id, true));
52+
break;
53+
}
54+
id++;
5055
}
51-
id++;
5256
}
5357
}
5458
}

Framework/src/runAdvanced.cxx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
/// \code{.sh}
2626
/// > aliBuild build QualityControl --defaults o2
2727
/// > alienv enter QualityControl/latest
28-
/// > qcRunAdvanced
28+
/// > o2-qc-run-advanced
2929
/// \endcode
3030
/// If you have glfw installed, you should see a window with the workflow visualization and sub-windows for each Data
3131
/// Processor where their logs can be seen. The processing will continue until the main window it is closed. Regardless
@@ -137,8 +137,8 @@ WorkflowSpec defineDataProcessing(ConfigContext const&)
137137
// a fix to make the topologies work when merged together
138138
localTopology.back().name += std::to_string(i);
139139
if (i != 2) {
140-
localTopology.back().inputs = { localTopology.back().inputs.back() };
141-
localTopology.back().outputs = { localTopology.back().outputs.back() };
140+
localTopology.back().inputs.erase(localTopology.back().inputs.begin(), localTopology.back().inputs.end() - 1);
141+
localTopology.back().outputs.erase(localTopology.back().outputs.begin(), localTopology.back().outputs.end() - 1);
142142
}
143143
DataSpecUtils::updateMatchingSubspec(localTopology.back().inputs.back(), i);
144144
DataSpecUtils::updateMatchingSubspec(localTopology.back().outputs.back(), i);

Framework/src/runBasic.cxx

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,18 @@
1616
///
1717
/// This is an executable showing QC Task's usage in Data Processing Layer. The workflow consists of data producer,
1818
/// which generates arrays of random size and content. Its output is dispatched to QC task using Data Sampling
19-
/// infrastructure. QC Task runs exemplary user code located in SkeletonDPL. The checker performes a simple check of
19+
/// infrastructure. QC Task runs exemplary user code located in SkeletonDPL. The checker performs a simple check of
2020
/// the histogram shape and colorizes it. The resulting histogram contents are shown in logs by printer.
2121
///
2222
/// QC task and Checker are instantiated by respectively TaskFactory and CheckerFactory,
2323
/// which use preinstalled config file, that can be found in
24-
/// ${QUALITYCONTROL_ROOT}/etc/qcTaskDplConfig.json or Framework/qcTaskDplConfig.json (original one).
24+
/// ${QUALITYCONTROL_ROOT}/etc/basic.json or Framework/basic.json (original one).
2525
///
2626
/// To launch it, build the project, load the environment and run the executable:
2727
/// \code{.sh}
2828
/// > aliBuild build QualityControl --defaults o2
2929
/// > alienv enter QualityControl/latest
30-
/// > runTaskDPL
30+
/// > o2-qc-run-basic
3131
/// \endcode
3232
/// If you have glfw installed, you should see a window with the workflow visualization and sub-windows for each Data
3333
/// Processor where their logs can be seen. The processing will continue until the main window it is closed. Regardless
@@ -72,6 +72,7 @@ void customize(std::vector<ConfigParamSpec>& workflowOptions)
7272

7373
std::string getConfigPath(const ConfigContext& config);
7474

75+
using namespace o2;
7576
using namespace o2::framework;
7677
using namespace o2::quality_control::checker;
7778
using namespace std::chrono;
@@ -86,14 +87,20 @@ WorkflowSpec defineDataProcessing(const ConfigContext& config)
8687
"producer",
8788
Inputs{},
8889
Outputs{
89-
{ "ITS", "RAWDATA", 0, Lifetime::Timeframe } },
90+
{ "TST", "RAWDATA", 0 } },
9091
AlgorithmSpec{
9192
(AlgorithmSpec::InitCallback)[](InitContext&){
93+
// this is the initialization code
9294
std::default_random_engine generator(11);
93-
return (AlgorithmSpec::ProcessCallback)[generator](ProcessingContext& processingContext) mutable {
95+
96+
// after the initialization, we return the processing callback
97+
return (AlgorithmSpec::ProcessCallback)[generator](ProcessingContext & processingContext) mutable
98+
{
99+
// everything inside this lambda function is invoked in a loop, because it this Data Processor has no inputs
94100
usleep(100000);
101+
95102
size_t length = generator() % 10000;
96-
auto data = processingContext.outputs().make<char>(Output{ "ITS", "RAWDATA", 0, Lifetime::Timeframe }, length);
103+
auto data = processingContext.outputs().make<char>(Output{ "TST", "RAWDATA" }, length);
97104
for (auto&& item : data) {
98105
item = static_cast<char>(generator());
99106
}
@@ -140,4 +147,4 @@ std::string getConfigPath(const ConfigContext& config)
140147
// Finally build the config path based on the default or the user-base one
141148
std::string path = std::string("json:/") + (userConfigPath.empty() ? defaultConfigPath : userConfigPath);
142149
return path;
143-
}
150+
}

Framework/src/runDataProducer.cxx

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
11+
///
12+
/// \file runDataProducer.cxx
13+
/// \author Piotr Konopka
14+
///
15+
/// \brief This is an executable with a basic data producer in Data Processing Layer.
16+
///
17+
/// This is an executable with a basic random data producer in Data Processing Layer. It does not serve a real purpose
18+
/// on its own, but it can be used as a data source for QC development. For example, one can do:
19+
/// \code{.sh}
20+
/// o2-qc-run-producer | o2-qc-run-qc --config json://${QUALITYCONTROL_ROOT}/etc/basic.json
21+
/// \endcode
22+
///
23+
/// If you have glfw installed, you should see a window with the workflow visualization and sub-windows for each Data
24+
/// Processor where their logs can be seen. The processing will continue until the main window it is closed. Regardless
25+
/// of glfw being installed or not, in the terminal all the logs will be shown as well.
26+
27+
#include <random>
28+
#include "Framework/runDataProcessing.h"
29+
30+
using namespace o2;
31+
using namespace o2::framework;
32+
using namespace std::chrono;
33+
34+
// clang-format off
35+
WorkflowSpec defineDataProcessing(const ConfigContext&)
36+
{
37+
WorkflowSpec specs;
38+
39+
// The producer to generate some data in the workflow
40+
DataProcessorSpec producer{
41+
"producer",
42+
Inputs{},
43+
Outputs{
44+
{ "TST", "RAWDATA", 0 } },
45+
AlgorithmSpec{
46+
(AlgorithmSpec::InitCallback)[](InitContext&){
47+
// this is the initialization code
48+
std::default_random_engine generator(11);
49+
50+
// after the initialization, we return the processing callback
51+
return (AlgorithmSpec::ProcessCallback)[generator](ProcessingContext & processingContext) mutable
52+
{
53+
// everything inside this lambda function is invoked in a loop, because it this Data Processor has no inputs
54+
usleep(100000);
55+
56+
size_t length = generator() % 10000;
57+
auto data = processingContext.outputs().make<char>(Output{ "TST", "RAWDATA" }, length);
58+
for (auto&& item : data) {
59+
item = static_cast<char>(generator());
60+
}
61+
};
62+
}
63+
}
64+
};
65+
specs.push_back(producer);
66+
67+
return specs;
68+
}
69+
// clang-format on

Framework/src/runQC.cxx

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
11+
///
12+
/// \file runQC.cxx
13+
/// \author Piotr Konopka
14+
///
15+
/// \brief This is an executable which generates a QC topology given a configuration file.
16+
///
17+
/// This is an executable which generates a QC topology given a configuration file. It can be attached to any other
18+
/// topology which can provide data to Data Sampling and QC. This also means that cannot work on its own, as it would
19+
/// lack input data. A typical usage would be:
20+
/// \code{.sh}
21+
/// o2-qc-run-producer | o2-qc-run-qc --config json://${QUALITYCONTROL_ROOT}/etc/basic.json
22+
/// \endcode
23+
/// Please refer to Framework/example-default.json and Framework/basic.json to see how to configure a QC topology.
24+
/// To generate only the local part of the topology (which would run on main processing servers) use the '--local' flag.
25+
/// Similarly, to generate only the remote part (running on QC servers) add '--remote'. By default, the executable
26+
/// generates both local and remote topologies, as it is the usual use-case for local development.
27+
28+
#include "Framework/DataSampling.h"
29+
using namespace o2::framework;
30+
31+
void customize(std::vector<ConfigParamSpec>& workflowOptions)
32+
{
33+
workflowOptions.push_back(
34+
ConfigParamSpec{ "config", VariantType::String, "", { "Path to QC and Data Sampling configuration file." } });
35+
36+
workflowOptions.push_back(
37+
ConfigParamSpec{ "local", VariantType::Bool, false, { "Creates only the local part of the QC topology." } });
38+
workflowOptions.push_back(
39+
ConfigParamSpec{ "host", VariantType::String, "", { "Name of the host of the local part of the QC topology."
40+
"Necessary to specify when creating topologies on multiple"
41+
" machines, can be omitted for the local development" } });
42+
workflowOptions.push_back(
43+
ConfigParamSpec{ "remote", VariantType::Bool, false, { "Creates only the remote part of the QC topology." } });
44+
}
45+
46+
#include <FairLogger.h>
47+
#include <TH1F.h>
48+
#include <memory>
49+
#include <random>
50+
51+
#include "Framework/runDataProcessing.h"
52+
53+
#include "QualityControl/Checker.h"
54+
#include "QualityControl/InfrastructureGenerator.h"
55+
56+
using namespace o2;
57+
using namespace o2::framework;
58+
using namespace o2::quality_control::checker;
59+
using namespace std::chrono;
60+
61+
WorkflowSpec defineDataProcessing(const ConfigContext& config)
62+
{
63+
WorkflowSpec specs;
64+
65+
const std::string qcConfigurationSource = config.options().get<std::string>("config");
66+
LOG(INFO) << "Using config file '" << qcConfigurationSource << "'";
67+
68+
// The QC infrastructure is divided into two parts:
69+
// - local - QC tasks which are on the same machines as the main processing. We also put Data Sampling there.
70+
// - remote - QC tasks, mergers and checkers that reside on QC servers
71+
//
72+
// The user can specify to create either one of these parts by selecting corresponding option,
73+
// or both of them, which is the default option (no flags needed).
74+
75+
if (config.options().get<bool>("local") && config.options().get<bool>("remote")) {
76+
LOG(INFO) << "To create both local and remote QC topologies, one does not have to add any of '--local' or '--remote' flags.";
77+
}
78+
79+
if (config.options().get<bool>("local") || !config.options().get<bool>("remote")) {
80+
81+
// Generation of Data Sampling infrastructure
82+
DataSampling::GenerateInfrastructure(specs, qcConfigurationSource);
83+
84+
// Generation of the local QC topology (local QC tasks)
85+
quality_control::generateLocalInfrastructure(specs, qcConfigurationSource, config.options().get<std::string>("host"));
86+
}
87+
if (config.options().get<bool>("remote") || !config.options().get<bool>("local")) {
88+
89+
// Generation of the remote QC topology (task for QC servers, mergers and all checkers)
90+
quality_control::generateRemoteInfrastructure(specs, qcConfigurationSource);
91+
}
92+
93+
return specs;
94+
}

0 commit comments

Comments
 (0)