Skip to content

Commit 165a40c

Browse files
knopers8Barthelemy
andauthored
[QC-589] Reduce the amount of configuration needed for mutlinode setups (#715)
* Do not require remote machine and port when running with AliECS * One less indentation in infrastucture generator * Allow for optional return type for DataSampling::PortForPolicy * Expand and update the multinode setups documentation * trigger rebuild * Update InfrastructureGenerator.cxx * Wrong if condition Co-authored-by: Barthélémy von Haller <barthelemy.von.haller@gmail.com> Co-authored-by: Barthélémy von Haller <barthelemy.von.haller@cern.ch>
1 parent 92c372d commit 165a40c

2 files changed

Lines changed: 124 additions & 83 deletions

File tree

Framework/src/InfrastructureGenerator.cxx

Lines changed: 92 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ using SubSpec = o2::header::DataHeader::SubSpecificationType;
4848
namespace o2::quality_control::core
4949
{
5050

51+
const char* defaultRemotePort = "36543";
52+
uint16_t defaultPolicyPort = 42349;
53+
5154
framework::WorkflowSpec InfrastructureGenerator::generateStandaloneInfrastructure(std::string configurationSource)
5255
{
5356
WorkflowSpec workflow;
@@ -88,46 +91,63 @@ WorkflowSpec InfrastructureGenerator::generateLocalInfrastructure(std::string co
8891
}
8992

9093
for (const auto& [taskName, taskConfig] : config->getRecursive("qc.tasks")) {
91-
if (taskConfig.get<bool>("active")) {
92-
if (taskConfig.get<std::string>("location") == "local") {
94+
if (!taskConfig.get<bool>("active")) {
95+
ILOG(Info, Devel) << "Task " << taskName << " is disabled, ignoring." << ENDM;
96+
continue;
97+
}
9398

94-
if (taskConfig.get_child("localMachines").empty()) {
95-
throw std::runtime_error("No local machines specified for task " + taskName + " in its configuration");
96-
}
99+
if (taskConfig.get<std::string>("location") == "local") {
100+
if (taskConfig.get_child("localMachines").empty()) {
101+
throw std::runtime_error("No local machines specified for task " + taskName + " in its configuration");
102+
}
97103

98-
size_t id = 1;
99-
for (const auto& machine : taskConfig.get_child("localMachines")) {
100-
// We spawn a task and proxy only if we are on the right machine.
101-
if (machine.second.get<std::string>("") == host) {
102-
// Generate QC Task Runner
103-
bool needsResetAfterCycle = taskConfig.get<std::string>("mergingMode", "delta") == "delta";
104-
workflow.emplace_back(taskRunnerFactory.create(taskName, configurationSource, id, needsResetAfterCycle));
105-
// Generate an output proxy
106-
// These should be removed when we are able to declare dangling output in normal DPL devices
107-
generateLocalTaskLocalProxy(workflow, id, taskName, taskConfig.get<std::string>("remoteMachine"), taskConfig.get<std::string>("remotePort"));
108-
break;
104+
size_t id = 1;
105+
for (const auto& machine : taskConfig.get_child("localMachines")) {
106+
// We spawn a task and proxy only if we are on the right machine.
107+
if (machine.second.get<std::string>("") == host) {
108+
// Generate QC Task Runner
109+
bool needsResetAfterCycle = taskConfig.get<std::string>("mergingMode", "delta") == "delta";
110+
workflow.emplace_back(taskRunnerFactory.create(taskName, configurationSource, id, needsResetAfterCycle));
111+
// Generate an output proxy
112+
// These should be removed when we are able to declare dangling output in normal DPL devices
113+
auto remoteMachine = taskConfig.get_optional<std::string>("remoteMachine");
114+
if (!remoteMachine.has_value()) {
115+
ILOG(Warning, Devel)
116+
<< "No remote machine was specified for a multinode QC setup."
117+
" This is fine if running with AliECS, but it will fail in standalone mode."
118+
<< ENDM;
109119
}
110-
id++;
111-
}
112-
} else // if (taskConfig.get<std::string>("location") == "remote")
113-
{
114-
// Collecting Data Sampling Policies
115-
auto dataSourceTree = taskConfig.get_child("dataSource");
116-
std::string type = dataSourceTree.get<std::string>("type");
117-
if (type == "dataSamplingPolicy") {
118-
samplingPoliciesUsed.insert(dataSourceTree.get<std::string>("name"));
119-
} else if (type == "direct") {
120-
throw std::runtime_error("Configuration error: Remote QC tasks such as " + taskName + " cannot use direct data sources");
121-
} else {
122-
throw std::runtime_error("Configuration error: dataSource type unknown : " + type);
120+
auto remotePort = taskConfig.get_optional<std::string>("remotePort");
121+
if (!remotePort.has_value()) {
122+
ILOG(Warning, Devel)
123+
<< "No remote port was specified for a multinode QC setup."
124+
" This is fine if running with AliECS, but it might fail in standalone mode."
125+
<< ENDM;
126+
}
127+
generateLocalTaskLocalProxy(workflow, id, taskName, remoteMachine.value_or("any"), remotePort.value_or(defaultRemotePort));
128+
break;
123129
}
130+
id++;
131+
}
132+
} else // if (taskConfig.get<std::string>("location") == "remote")
133+
{
134+
// Collecting Data Sampling Policies
135+
auto dataSourceTree = taskConfig.get_child("dataSource");
136+
std::string type = dataSourceTree.get<std::string>("type");
137+
if (type == "dataSamplingPolicy") {
138+
samplingPoliciesUsed.insert(dataSourceTree.get<std::string>("name"));
139+
} else if (type == "direct") {
140+
throw std::runtime_error("Configuration error: Remote QC tasks such as " + taskName + " cannot use direct data sources");
141+
} else {
142+
throw std::runtime_error("Configuration error: dataSource type unknown : " + type);
124143
}
125144
}
126145
}
127146

128147
// Creating Data Sampling Policies proxies
129148
for (const auto& policyName : samplingPoliciesUsed) {
130-
std::string port = std::to_string(DataSampling::PortForPolicy(config.get(), policyName));
149+
// todo: leave only the new way once the return type is changed
150+
std::string port = std::to_string(std::optional<uint16_t>(DataSampling::PortForPolicy(config.get(), policyName)).value_or(defaultPolicyPort));
131151
Inputs inputSpecs = DataSampling::InputSpecsForPolicy(config.get(), policyName);
132152

133153
std::vector<std::string> machines = DataSampling::MachinesForPolicy(config.get(), policyName);
@@ -157,43 +177,51 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur
157177
if (config->getRecursive("qc").count("tasks")) {
158178
TaskRunnerFactory taskRunnerFactory;
159179
for (const auto& [taskName, taskConfig] : config->getRecursive("qc.tasks")) {
160-
if (taskConfig.get<bool>("active", true)) {
161-
162-
if (taskConfig.get<std::string>("location") == "local") {
163-
// if tasks are LOCAL, generate input proxies + mergers + checkers
180+
if (!taskConfig.get<bool>("active", true)) {
181+
ILOG(Info, Devel) << "Task " << taskName << " is disabled, ignoring." << ENDM;
182+
continue;
183+
}
164184

165-
size_t numberOfLocalMachines = taskConfig.get_child("localMachines").size() > 1 ? taskConfig.get_child("localMachines").size() : 1;
166-
// Generate an input proxy
167-
// These should be removed when we are able to declare dangling inputs in normal DPL devices
168-
generateLocalTaskRemoteProxy(workflow, taskName, numberOfLocalMachines, taskConfig.get<std::string>("remotePort"));
185+
if (taskConfig.get<std::string>("location") == "local") {
186+
// if tasks are LOCAL, generate input proxies + mergers + checkers
187+
188+
size_t numberOfLocalMachines = taskConfig.get_child("localMachines").size() > 1 ? taskConfig.get_child("localMachines").size() : 1;
189+
// Generate an input proxy
190+
// These should be removed when we are able to declare dangling inputs in normal DPL devices
191+
auto remotePort = taskConfig.get_optional<std::string>("remotePort");
192+
if (!remotePort.has_value()) {
193+
ILOG(Warning, Devel) << "No remote port was specified for a multinode QC setup."
194+
" This is fine if running with AliECS, but it might fail in standalone mode."
195+
<< ENDM;
196+
}
197+
generateLocalTaskRemoteProxy(workflow, taskName, numberOfLocalMachines, remotePort.value_or(defaultRemotePort));
169198

170-
generateMergers(workflow, taskName, numberOfLocalMachines,
171-
taskConfig.get<double>("cycleDurationSeconds"),
172-
taskConfig.get<std::string>("mergingMode", "delta"));
199+
generateMergers(workflow, taskName, numberOfLocalMachines,
200+
taskConfig.get<double>("cycleDurationSeconds"),
201+
taskConfig.get<std::string>("mergingMode", "delta"));
173202

174-
} else if (taskConfig.get<std::string>("location") == "remote") {
203+
} else if (taskConfig.get<std::string>("location") == "remote") {
175204

176-
// -- if tasks are REMOTE, generate dispatcher proxies + tasks + checkers
177-
// (for the time being we don't foresee parallel tasks on QC servers, so no mergers here)
205+
// -- if tasks are REMOTE, generate dispatcher proxies + tasks + checkers
206+
// (for the time being we don't foresee parallel tasks on QC servers, so no mergers here)
178207

179-
// fixme: ideally we should check if we are on the right remote machine, but now we support only n -> 1 setups,
180-
// so there is no point. Also, I expect that we should be able to generate one big topology or its parts
181-
// and we would place it among QC servers using AliECS, not by configuration files.
208+
// fixme: ideally we should check if we are on the right remote machine, but now we support only n -> 1 setups,
209+
// so there is no point. Also, I expect that we should be able to generate one big topology or its parts
210+
// and we would place it among QC servers using AliECS, not by configuration files.
182211

183-
// Collecting Data Sampling Policies
184-
auto dataSourceTree = taskConfig.get_child("dataSource");
185-
std::string type = dataSourceTree.get<std::string>("type");
186-
if (type == "dataSamplingPolicy") {
187-
samplingPoliciesUsed.insert(dataSourceTree.get<std::string>("name"));
188-
} else if (type == "direct") {
189-
throw std::runtime_error("Configuration error: Remote QC tasks such as " + taskName + " cannot use direct data sources");
190-
} else {
191-
throw std::runtime_error("Configuration error: dataSource type unknown : " + type);
192-
}
193-
194-
// Creating the remote task
195-
workflow.emplace_back(taskRunnerFactory.create(taskName, configurationSource, 0));
212+
// Collecting Data Sampling Policies
213+
auto dataSourceTree = taskConfig.get_child("dataSource");
214+
std::string type = dataSourceTree.get<std::string>("type");
215+
if (type == "dataSamplingPolicy") {
216+
samplingPoliciesUsed.insert(dataSourceTree.get<std::string>("name"));
217+
} else if (type == "direct") {
218+
throw std::runtime_error("Configuration error: Remote QC tasks such as " + taskName + " cannot use direct data sources");
219+
} else {
220+
throw std::runtime_error("Configuration error: dataSource type unknown : " + type);
196221
}
222+
223+
// Creating the remote task
224+
workflow.emplace_back(taskRunnerFactory.create(taskName, configurationSource, 0));
197225
}
198226
}
199227
}
@@ -202,7 +230,9 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur
202230
for (const auto& policyName : samplingPoliciesUsed) {
203231
// todo now we have to generate one proxy per local machine and policy, because of the proxy limitations.
204232
// Use one proxy per policy when it is possible.
205-
std::string port = std::to_string(DataSampling::PortForPolicy(config.get(), policyName));
233+
234+
// todo: leave only the new way once the return type is changed
235+
std::string port = std::to_string(std::optional<uint16_t>(DataSampling::PortForPolicy(config.get(), policyName)).value_or(defaultPolicyPort));
206236
Outputs outputSpecs = DataSampling::OutputSpecsForPolicy(config.get(), policyName);
207237
std::vector<std::string> machines = DataSampling::MachinesForPolicy(config.get(), policyName);
208238
for (const auto& machine : machines) {
@@ -428,7 +458,7 @@ vector<OutputSpec> InfrastructureGenerator::generateCheckRunners(framework::Work
428458
break;
429459
}
430460
}
431-
if (!isStored) {
461+
if (!isStored) { // fixme: statement is always true
432462
// If there is no Check for a given input, create a candidate for a sink device
433463
InputNames singleEntry{ label };
434464
// Init empty Check vector to appear in the next step

doc/Advanced.md

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,9 @@ locally produced Monitor Objects should be merged on QC servers and then have Ch
132132
By **remote QC tasks** we mean those which run on QC servers (**remote machines**), while **local QC Tasks**
133133
run on FLPs and EPNs (**local machines**).
134134

135-
While it is responsibility of the run operators to run all the processing topologies during the
136-
data taking, here we show how to achieve such multinode workflows on development setups, running
137-
them just with DPL driver. Note that for now we support cases with one or more local machines,
138-
but just only one remote machine.
135+
Setting up a multinode setup to run standalone or with AliECS requires different amount of parameters,
136+
as some of them are overwritten by AliECS anyway. Such parameters are marked accordingly. Please note
137+
that for now we support cases with one or more local machines, but just only one remote machine.
139138

140139
In our example, we assume having two local processing nodes (`localnode1`, `localnode2`) and one
141140
QC node (`qcnode`). There are two types of QC Tasks declared:
@@ -145,7 +144,9 @@ QC node (`qcnode`). There are two types of QC Tasks declared:
145144
`localnode2` only. Mergers are not needed in this case, but there is a process running Checks against
146145
Monitor Objects generated by this Task.
147146

148-
We use the `SkeletonTask` class for both, but any Task can be used of course. Should a Task be local, all its `MonitorObject`s need to be mergeable - they should be one of the mergeable ROOT types (histograms, TTrees) or inherit [MergeInterface](https://github.com/AliceO2Group/AliceO2/blob/dev/Utilities/Mergers/include/Mergers/MergeInterface.h).
147+
We use the `SkeletonTask` class for both, but any Task can be used of course. Should a Task be local,
148+
all its `MonitorObject`s need to be mergeable - they should be one of the mergeable ROOT types (histograms, TTrees)
149+
or inherit [MergeInterface](https://github.com/AliceO2Group/AliceO2/blob/dev/Utilities/Mergers/include/Mergers/MergeInterface.h).
149150

150151
These are the steps to follow to get a multinode setup:
151152

@@ -165,18 +166,21 @@ added:
165166
"localnode1",
166167
"localnode2"
167168
],
168-
"remoteMachine": "qcnode",
169-
"remotePort": "30132",
170-
"mergingMode": "delta"
169+
"remoteMachine": "qcnode", "":"not needed with AliECS",
170+
"remotePort": "30132", "":"not needed with AliECS",
171+
"mergingMode": "delta", "":"if absent, delta is default"
171172
}
172173
},
173174
```
174175
List the local processing machines in the `localMachines` array. `remoteMachine` should contain the host name which
175-
will serve as a QC server and `remotePort` should be a port number on which Mergers will wait for upcoming MOs. Make
176-
sure it is not used by other service. If different QC Tasks are run in parallel, use separate ports for each. One
177-
also may choose the merging mode - `delta` is the default and recommended (tasks are reset after each cycle, so they
178-
send only updates), but if it is not feasible, Mergers may expect `entire` objects - tasks are not reset, they
179-
always send entire objects and the latest versions are combined in Mergers.
176+
will serve as a QC server and `remotePort` should be a port number on which Mergers will wait for upcoming MOs. Make
177+
sure it is not used by other service. If different QC Tasks are run in parallel, use separate ports for each. One
178+
also may choose the merging mode - `delta` is the default and recommended (tasks are reset after each cycle, so they
179+
send only updates), but if it is not feasible, Mergers may expect `entire` objects - tasks are not reset, they
180+
always send entire objects and the latest versions are combined in Mergers.
181+
182+
With the `delta` mode, one can cheat by specifying just one local machine name and referencing only that one later.
183+
This is not possible with `entire` mode, because then Mergers need identifiable data sources to merge objects correctly.
180184

181185
In case of a remote task, choosing `"remote"` option for the `"location"` parameter is enough.
182186

@@ -196,29 +200,28 @@ In case of a remote task, choosing `"remote"` option for the `"location"` parame
196200
}
197201
```
198202

199-
However in both cases, one has to specify the machines where data should be sampled, as below. If data should be
200-
published to external machines (with remote tasks), one has to add a local port number. Use separate ports for each
201-
Data Sampling Policy.
203+
In case the task is running remotely, one has to specify the machines where data should be published to external
204+
machines (with remote tasks) and a local port number. Use separate ports for each Data Sampling Policy.
202205
```json
203206
{
204207
"dataSamplingPolicies": [
205208
...
206209
{
207210
"id": "rnd-little",
208211
"active": "true",
209-
"machines": [
212+
"machines": [ "","needed only for remote QC tasks",
210213
"localnode2"
211214
],
212-
"port": "30333"
215+
"port": "30333", "":"not needed with AliECS",
213216
...
214217
}
215218
]
216219
}
217220
```
218221
/
219222
2. Make sure that the firewalls are properly configured. If your machines block incoming/outgoing connections by
220-
default, you can add these rules to the firewall (run as sudo). Consider enabling only concrete ports or a small
221-
range of those.
223+
default, you can add these rules to the firewall (run as sudo). Consider enabling only concrete ports or a small
224+
range of those.
222225

223226
```
224227
# localnode1 and localnode2 :
@@ -230,6 +233,11 @@ iptables -I OUTPUT -p tcp -m conntrack --ctstate NEW,ESTABLISHED -d localnode1 -
230233
iptables -I INPUT -p tcp -m conntrack --ctstate NEW,ESTABLISHED -s localnode2 -j ACCEPT
231234
iptables -I OUTPUT -p tcp -m conntrack --ctstate NEW,ESTABLISHED -d localnode2 -j ACCEPT
232235
```
236+
If your network is isolated, you might consider disabling the firewall as an alternative. Be wary of the security risks.
237+
```
238+
systemctl stop firewalld # to disable until reboot
239+
systemctl disable firewalld # to disable permanently
240+
```
233241
234242
3. Install the same version of the QC software on each of these nodes. We cannot guarantee that different QC versions will talk to each other without problems. Also, make sure the configuration file that you will use is the same everywhere.
235243
@@ -246,7 +254,10 @@ o2-qc --config json:/${QUALITYCONTROL_ROOT}/etc/multiNode.json --remote
246254
If there are no problems, on QCG you should see the `example` histogram updated under the paths `qc/TST/MO/MultiNodeLocal`
247255
and `qc/TST/MO/MultiNodeRemote`, and corresponding Checks under the path `qc/TST/QO/`.
248256
249-
## Writing a DPL data producer
257+
When using AliECS, one has to generate workflow templates and upload them to the corresponding repository. Please
258+
contact the QC or AliECS developers to receive assistance or instruction on how to do that.
259+
260+
## Writing a DPL data producer
250261
251262
For your convenience, and although it does not lie within the QC scope, we would like to document how to write a simple data producer in the DPL. The DPL documentation can be found [here](https://github.com/AliceO2Group/AliceO2/blob/dev/Framework/Core/README.md) and for questions please head to the [forum](https://alice-talk.web.cern.ch/).
252263

0 commit comments

Comments
 (0)