Skip to content

Commit dc53b94

Browse files
authored
[QC-443] Give only ptrees to Data Sampling (#801)
1 parent b0ae952 commit dc53b94

14 files changed

Lines changed: 155 additions & 118 deletions

File tree

Framework/include/QualityControl/InfrastructureSpecReader.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@ class InfrastructureSpecReader
3434
public:
3535
/// \brief Reads the full QC configuration file.
3636
// todo remove configurationSource when it is possible
37-
static InfrastructureSpec readInfrastructureSpec(const boost::property_tree::ptree&, const std::string& configurationSource);
37+
static InfrastructureSpec readInfrastructureSpec(const boost::property_tree::ptree& wholeTree, const std::string& configurationSource);
3838

3939
// readers for separate parts
40-
static CommonSpec readCommonSpec(const boost::property_tree::ptree& config, const std::string& configurationSource);
41-
static TaskSpec readTaskSpec(std::string taskName, const boost::property_tree::ptree& taskSpec, const std::string& configurationSource);
42-
static DataSourceSpec readDataSourceSpec(const boost::property_tree::ptree& dataSourceSpec, const std::string& configurationSource);
40+
static CommonSpec readCommonSpec(const boost::property_tree::ptree& commonTree, const std::string& configurationSource);
41+
static TaskSpec readTaskSpec(std::string taskName, const boost::property_tree::ptree& taskTree, const boost::property_tree::ptree& wholeTree);
42+
static DataSourceSpec readDataSourceSpec(const boost::property_tree::ptree& dataSourceTree, const boost::property_tree::ptree& wholeTree);
4343

4444
static std::string validateDetectorName(std::string name);
4545
};

Framework/src/InfrastructureGenerator.cxx

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -156,16 +156,18 @@ WorkflowSpec InfrastructureGenerator::generateLocalInfrastructure(std::string co
156156
}
157157
}
158158

159-
// Creating Data Sampling Policies proxies
160-
for (const auto& [policyName, control] : samplingPoliciesUsed) {
161-
// todo: leave only the new way once the return type is changed
162-
std::string port = std::to_string(std::optional<uint16_t>(DataSampling::PortForPolicy(config.get(), policyName)).value_or(defaultPolicyPort));
163-
Inputs inputSpecs = DataSampling::InputSpecsForPolicy(config.get(), policyName);
164-
165-
std::vector<std::string> machines = DataSampling::MachinesForPolicy(config.get(), policyName);
166-
for (const auto& machine : machines) {
167-
if (machine == targetHost) {
168-
generateDataSamplingPolicyLocalProxy(workflow, policyName, inputSpecs, machine, port, control);
159+
if (!samplingPoliciesUsed.empty()) {
160+
auto dataSamplingTree = config->getRecursive("dataSamplingPolicies");
161+
// Creating Data Sampling Policies proxies
162+
for (const auto& [policyName, control] : samplingPoliciesUsed) {
163+
std::string port = std::to_string(DataSampling::PortForPolicy(dataSamplingTree, policyName).value_or(defaultPolicyPort));
164+
Inputs inputSpecs = DataSampling::InputSpecsForPolicy(dataSamplingTree, policyName);
165+
166+
std::vector<std::string> machines = DataSampling::MachinesForPolicy(dataSamplingTree, policyName);
167+
for (const auto& machine : machines) {
168+
if (machine == targetHost) {
169+
generateDataSamplingPolicyLocalProxy(workflow, policyName, inputSpecs, machine, port, control);
170+
}
169171
}
170172
}
171173
}
@@ -231,17 +233,19 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur
231233
}
232234
}
233235

234-
// Creating Data Sampling Policies proxies
235-
for (const auto& [policyName, control] : samplingPoliciesUsed) {
236-
// todo now we have to generate one proxy per local machine and policy, because of the proxy limitations.
237-
// Use one proxy per policy when it is possible.
238-
239-
// todo: leave only the new way once the return type is changed
240-
std::string port = std::to_string(std::optional<uint16_t>(DataSampling::PortForPolicy(config.get(), policyName)).value_or(defaultPolicyPort));
241-
Outputs outputSpecs = DataSampling::OutputSpecsForPolicy(config.get(), policyName);
242-
std::vector<std::string> machines = DataSampling::MachinesForPolicy(config.get(), policyName);
243-
for (const auto& machine : machines) {
244-
generateDataSamplingPolicyRemoteProxy(workflow, policyName, outputSpecs, machine, port, control);
236+
if (!samplingPoliciesUsed.empty()) {
237+
auto dataSamplingTree = config->getRecursive("dataSamplingPolicies");
238+
// Creating Data Sampling Policies proxies
239+
for (const auto& [policyName, control] : samplingPoliciesUsed) {
240+
// todo now we have to generate one proxy per local machine and policy, because of the proxy limitations.
241+
// Use one proxy per policy when it is possible.
242+
243+
std::string port = std::to_string(DataSampling::PortForPolicy(dataSamplingTree, policyName).value_or(defaultPolicyPort));
244+
Outputs outputSpecs = DataSampling::OutputSpecsForPolicy(dataSamplingTree, policyName);
245+
std::vector<std::string> machines = DataSampling::MachinesForPolicy(dataSamplingTree, policyName);
246+
for (const auto& machine : machines) {
247+
generateDataSamplingPolicyRemoteProxy(workflow, policyName, outputSpecs, machine, port, control);
248+
}
245249
}
246250
}
247251

Framework/src/InfrastructureSpecReader.cxx

Lines changed: 46 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ using namespace o2::framework;
2626
namespace o2::quality_control::core
2727
{
2828

29-
InfrastructureSpec InfrastructureSpecReader::readInfrastructureSpec(const boost::property_tree::ptree& config, const std::string& configurationSource)
29+
InfrastructureSpec InfrastructureSpecReader::readInfrastructureSpec(const boost::property_tree::ptree& wholeTree, const std::string& configurationSource)
3030
{
3131
InfrastructureSpec spec;
32-
const auto& qcTree = config.get_child("qc");
32+
const auto& qcTree = wholeTree.get_child("qc");
3333
if (qcTree.find("config") != qcTree.not_found()) {
3434
spec.common = readCommonSpec(qcTree.get_child("config"), configurationSource);
3535
} else {
@@ -39,32 +39,32 @@ InfrastructureSpec InfrastructureSpecReader::readInfrastructureSpec(const boost:
3939
const auto& tasksTree = qcTree.get_child("tasks");
4040
spec.tasks.reserve(tasksTree.size());
4141
for (const auto& [taskName, taskConfig] : tasksTree) {
42-
spec.tasks.push_back(readTaskSpec(taskName, taskConfig, configurationSource));
42+
spec.tasks.push_back(readTaskSpec(taskName, taskConfig, wholeTree));
4343
}
4444
}
4545
return spec;
4646
}
4747

48-
CommonSpec InfrastructureSpecReader::readCommonSpec(const boost::property_tree::ptree& config, const std::string& configurationSource)
48+
CommonSpec InfrastructureSpecReader::readCommonSpec(const boost::property_tree::ptree& commonTree, const std::string& configurationSource)
4949
{
5050
CommonSpec spec;
51-
for (const auto& [key, value] : config.get_child("database")) {
51+
for (const auto& [key, value] : commonTree.get_child("database")) {
5252
spec.database.emplace(key, value.get_value<std::string>());
5353
}
54-
spec.activityNumber = config.get<int>("Activity.number", spec.activityNumber);
55-
spec.activityType = config.get<int>("Activity.type", spec.activityType);
56-
spec.monitoringUrl = config.get<std::string>("monitoring.url", spec.monitoringUrl);
57-
spec.consulUrl = config.get<std::string>("consul.url", spec.consulUrl);
58-
spec.conditionDBUrl = config.get<std::string>("conditionDB.url", spec.conditionDBUrl);
59-
spec.infologgerFilterDiscardDebug = config.get<bool>("infologger.filterDiscardDebug", spec.infologgerFilterDiscardDebug);
60-
spec.infologgerDiscardLevel = config.get<int>("infologger.filterDiscardLevel", spec.infologgerDiscardLevel);
54+
spec.activityNumber = commonTree.get<int>("Activity.number", spec.activityNumber);
55+
spec.activityType = commonTree.get<int>("Activity.type", spec.activityType);
56+
spec.monitoringUrl = commonTree.get<std::string>("monitoring.url", spec.monitoringUrl);
57+
spec.consulUrl = commonTree.get<std::string>("consul.url", spec.consulUrl);
58+
spec.conditionDBUrl = commonTree.get<std::string>("conditionDB.url", spec.conditionDBUrl);
59+
spec.infologgerFilterDiscardDebug = commonTree.get<bool>("infologger.filterDiscardDebug", spec.infologgerFilterDiscardDebug);
60+
spec.infologgerDiscardLevel = commonTree.get<int>("infologger.filterDiscardLevel", spec.infologgerDiscardLevel);
6161

6262
spec.configurationSource = configurationSource;
6363

6464
return spec;
6565
}
6666

67-
TaskSpec InfrastructureSpecReader::readTaskSpec(std::string taskName, const boost::property_tree::ptree& config, const std::string& configurationSource)
67+
TaskSpec InfrastructureSpecReader::readTaskSpec(std::string taskName, const boost::property_tree::ptree& taskTree, const boost::property_tree::ptree& wholeTree)
6868
{
6969
static std::unordered_map<std::string, TaskLocationSpec> const taskLocationFromString = {
7070
{ "local", TaskLocationSpec::Local },
@@ -74,52 +74,52 @@ TaskSpec InfrastructureSpecReader::readTaskSpec(std::string taskName, const boos
7474
TaskSpec ts;
7575

7676
ts.taskName = taskName;
77-
ts.className = config.get<std::string>("className");
78-
ts.moduleName = config.get<std::string>("moduleName");
79-
ts.detectorName = config.get<std::string>("detectorName");
80-
ts.cycleDurationSeconds = config.get<int>("cycleDurationSeconds");
81-
ts.dataSource = readDataSourceSpec(config.get_child("dataSource"), configurationSource);
82-
83-
ts.active = config.get<bool>("active", ts.active);
84-
ts.maxNumberCycles = config.get<int>("maxNumberCycles", ts.maxNumberCycles);
85-
ts.resetAfterCycles = config.get<size_t>("resetAfterCycles", ts.resetAfterCycles);
86-
ts.saveObjectsToFile = config.get<std::string>("saveObjectsToFile", ts.saveObjectsToFile);
87-
if (config.count("taskParameters") > 0) {
88-
for (const auto& [key, value] : config.get_child("taskParameters")) {
77+
ts.className = taskTree.get<std::string>("className");
78+
ts.moduleName = taskTree.get<std::string>("moduleName");
79+
ts.detectorName = taskTree.get<std::string>("detectorName");
80+
ts.cycleDurationSeconds = taskTree.get<int>("cycleDurationSeconds");
81+
ts.dataSource = readDataSourceSpec(taskTree.get_child("dataSource"), wholeTree);
82+
83+
ts.active = taskTree.get<bool>("active", ts.active);
84+
ts.maxNumberCycles = taskTree.get<int>("maxNumberCycles", ts.maxNumberCycles);
85+
ts.resetAfterCycles = taskTree.get<size_t>("resetAfterCycles", ts.resetAfterCycles);
86+
ts.saveObjectsToFile = taskTree.get<std::string>("saveObjectsToFile", ts.saveObjectsToFile);
87+
if (taskTree.count("taskParameters") > 0) {
88+
for (const auto& [key, value] : taskTree.get_child("taskParameters")) {
8989
ts.customParameters.emplace(key, value.get_value<std::string>());
9090
}
9191
}
9292

93-
bool multinodeSetup = config.find("location") != config.not_found();
94-
ts.location = taskLocationFromString.at(config.get<std::string>("location", "remote"));
95-
if (config.count("localMachines") > 0) {
96-
for (const auto& [key, value] : config.get_child("localMachines")) {
93+
bool multinodeSetup = taskTree.find("location") != taskTree.not_found();
94+
ts.location = taskLocationFromString.at(taskTree.get<std::string>("location", "remote"));
95+
if (taskTree.count("localMachines") > 0) {
96+
for (const auto& [key, value] : taskTree.get_child("localMachines")) {
9797
ts.localMachines.emplace_back(value.get_value<std::string>());
9898
}
9999
}
100-
if (multinodeSetup && config.count("remoteMachine") > 0) {
100+
if (multinodeSetup && taskTree.count("remoteMachine") > 0) {
101101
ILOG(Warning, Trace)
102102
<< "No remote machine was specified for a multinode QC setup."
103103
" This is fine if running with AliECS, but it will fail in standalone mode."
104104
<< ENDM;
105105
}
106-
ts.remoteMachine = config.get<std::string>("remoteMachine", ts.remoteMachine);
107-
if (multinodeSetup && config.count("remotePort") > 0) {
106+
ts.remoteMachine = taskTree.get<std::string>("remoteMachine", ts.remoteMachine);
107+
if (multinodeSetup && taskTree.count("remotePort") > 0) {
108108
ILOG(Warning, Trace)
109109
<< "No remote port was specified for a multinode QC setup."
110110
" This is fine if running with AliECS, but it might fail in standalone mode."
111111
<< ENDM;
112112
}
113-
ts.remotePort = config.get<uint16_t>("remotePort", ts.remotePort);
114-
ts.localControl = config.get<std::string>("localControl", ts.localControl);
115-
ts.mergingMode = config.get<std::string>("mergingMode", ts.mergingMode);
116-
ts.mergerCycleMultiplier = config.get<int>("mergerCycleMultiplier", ts.mergerCycleMultiplier);
113+
ts.remotePort = taskTree.get<uint16_t>("remotePort", ts.remotePort);
114+
ts.localControl = taskTree.get<std::string>("localControl", ts.localControl);
115+
ts.mergingMode = taskTree.get<std::string>("mergingMode", ts.mergingMode);
116+
ts.mergerCycleMultiplier = taskTree.get<int>("mergerCycleMultiplier", ts.mergerCycleMultiplier);
117117

118118
return ts;
119119
}
120120

121-
DataSourceSpec InfrastructureSpecReader::readDataSourceSpec(const boost::property_tree::ptree& dataSourceSpec,
122-
const std::string& configurationSource)
121+
DataSourceSpec InfrastructureSpecReader::readDataSourceSpec(const boost::property_tree::ptree& dataSourceTree,
122+
const boost::property_tree::ptree& wholeTree)
123123
{
124124
static std::unordered_map<std::string, DataSourceType> const dataSourceTypeFromString = {
125125
// fixme: the convention is inconsistent and it should be fixed in coordination with configuration files
@@ -133,30 +133,30 @@ DataSourceSpec InfrastructureSpecReader::readDataSourceSpec(const boost::propert
133133
};
134134

135135
DataSourceSpec dss;
136-
dss.type = dataSourceTypeFromString.at(dataSourceSpec.get<std::string>("type"));
136+
dss.type = dataSourceTypeFromString.at(dataSourceTree.get<std::string>("type"));
137137

138138
switch (dss.type) {
139139
case DataSourceType::DataSamplingPolicy: {
140-
auto name = dataSourceSpec.get<std::string>("name");
140+
auto name = dataSourceTree.get<std::string>("name");
141141
dss.typeSpecificParams.insert({ "name", name });
142-
dss.inputs = DataSampling::InputSpecsForPolicy(configurationSource, name); //fixme: add a method which takes a ptree, then i can remove configurationSource
142+
dss.inputs = DataSampling::InputSpecsForPolicy(wholeTree.get_child("dataSamplingPolicies"), name);
143143
break;
144144
}
145145
case DataSourceType::Direct: {
146-
dss.typeSpecificParams.insert({ "query", dataSourceSpec.get<std::string>("query") });
147-
auto inputsQuery = dataSourceSpec.get<std::string>("query");
146+
dss.typeSpecificParams.insert({ "query", dataSourceTree.get<std::string>("query") });
147+
auto inputsQuery = dataSourceTree.get<std::string>("query");
148148
dss.inputs = DataDescriptorQueryBuilder::parse(inputsQuery.c_str());
149149
break;
150150
}
151151
case DataSourceType::Task: // todo all below
152152
case DataSourceType::PostProcessingTask:
153153
case DataSourceType::Check:
154154
case DataSourceType::Aggregator:
155-
dss.typeSpecificParams.insert({ "name", dataSourceSpec.get<std::string>("name") });
155+
dss.typeSpecificParams.insert({ "name", dataSourceTree.get<std::string>("name") });
156156
break;
157157
case DataSourceType::ExternalTask:
158-
dss.typeSpecificParams.insert({ "name", dataSourceSpec.get<std::string>("name") });
159-
dss.typeSpecificParams.insert({ "query", dataSourceSpec.get<std::string>("query") });
158+
dss.typeSpecificParams.insert({ "name", dataSourceTree.get<std::string>("name") });
159+
dss.typeSpecificParams.insert({ "query", dataSourceTree.get<std::string>("query") });
160160
break;
161161
case DataSourceType::Invalid:
162162
// todo: throw?

Framework/src/runAdvanced.cxx

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,8 @@
3838
/// \endcode
3939

4040
#include <Framework/CompletionPolicyHelpers.h>
41-
#include <Framework/DataSpecUtils.h>
4241
#include <DataSampling/DataSampling.h>
4342
#include "QualityControl/InfrastructureGenerator.h"
44-
#include "QualityControl/QcInfoLogger.h"
45-
#include "QualityControl/AdvancedWorkflow.h"
4643

4744
using namespace o2;
4845
using namespace o2::framework;
@@ -73,11 +70,17 @@ void customize(std::vector<ConfigParamSpec>& workflowOptions)
7370
}
7471

7572
#include <Framework/runDataProcessing.h>
73+
#include <Framework/DataSpecUtils.h>
74+
#include <Configuration/ConfigurationFactory.h>
75+
#include <Configuration/ConfigurationInterface.h>
76+
#include "QualityControl/QcInfoLogger.h"
77+
#include "QualityControl/AdvancedWorkflow.h"
7678

7779
using namespace o2;
7880
using namespace o2::header;
7981
using namespace o2::quality_control::core;
8082
using SubSpecificationType = o2::header::DataHeader::SubSpecificationType;
83+
using namespace o2::configuration;
8184

8285
WorkflowSpec defineDataProcessing(ConfigContext const& config)
8386
{
@@ -93,7 +96,9 @@ WorkflowSpec defineDataProcessing(ConfigContext const& config)
9396
WorkflowSpec specs = getFullProcessingTopology();
9497

9598
if (!noQC) {
96-
DataSampling::GenerateInfrastructure(specs, qcConfigurationSource);
99+
auto configInterface = ConfigurationFactory::getConfiguration(qcConfigurationSource);
100+
auto dataSamplingTree = configInterface->getRecursive("dataSamplingPolicies");
101+
DataSampling::GenerateInfrastructure(specs, dataSamplingTree);
97102
// Generation of the remote QC topology (for the QC servers)
98103
quality_control::generateStandaloneInfrastructure(specs, qcConfigurationSource);
99104
}

Framework/src/runBasic.cxx

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ void customize(std::vector<ConfigParamSpec>& workflowOptions)
6868

6969
#include <Framework/runDataProcessing.h>
7070
#include <Configuration/ConfigurationFactory.h>
71+
#include <Configuration/ConfigurationInterface.h>
7172

7273
#include "QualityControl/CheckRunner.h"
7374
#include "QualityControl/InfrastructureGenerator.h"
@@ -80,6 +81,7 @@ std::string getConfigPath(const ConfigContext& config);
8081

8182
using namespace o2;
8283
using namespace o2::framework;
84+
using namespace o2::configuration;
8385
using namespace o2::quality_control::checker;
8486
using namespace std::chrono;
8587

@@ -96,7 +98,9 @@ WorkflowSpec defineDataProcessing(const ConfigContext& config)
9698
ILOG(Info, Ops) << "Using config file '" << qcConfigurationSource << "'" << ENDM;
9799

98100
// Generation of Data Sampling infrastructure
99-
DataSampling::GenerateInfrastructure(specs, qcConfigurationSource);
101+
auto configInterface = ConfigurationFactory::getConfiguration(qcConfigurationSource);
102+
auto dataSamplingTree = configInterface->getRecursive("dataSamplingPolicies");
103+
DataSampling::GenerateInfrastructure(specs, dataSamplingTree);
100104

101105
// Generation of the QC topology (one task, one checker in this case)
102106
quality_control::generateStandaloneInfrastructure(specs, qcConfigurationSource);

0 commit comments

Comments
 (0)