Skip to content

Commit 9b293d6

Browse files
knopers8Barthelemy
authored andcommitted
Advanced random data producer (#263)
1 parent 83e4ad9 commit 9b293d6

5 files changed

Lines changed: 176 additions & 62 deletions

File tree

Framework/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ add_library(QualityControl
5757
src/PostProcessingFactory.cxx
5858
src/PostProcessingConfig.cxx
5959
src/PostProcessingInterface.cxx
60-
src/DummyDatabase.cxx)
60+
src/DummyDatabase.cxx
61+
src/DataProducer.cxx)
6162

6263
if(ENABLE_MYSQL)
6364
target_sources(QualityControl PRIVATE src/MySqlDatabase.cxx)
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
11+
///
12+
/// \file DataProducer.h
13+
/// \author Piotr Konopka
14+
///
15+
16+
#ifndef QUALITYCONTROL_DATAPRODUCER_H
17+
#define QUALITYCONTROL_DATAPRODUCER_H
18+
19+
#include <Framework/DataProcessorSpec.h>
20+
21+
namespace o2::quality_control::core
22+
{
23+
24+
/// \brief Returns a random data producer specification which publishes on {"TST", "RAWDATA", <index>}
25+
///
26+
/// \param minSize Minimum size of a message in bytes
27+
/// \param maxSize Maximum size of a message in bytes
28+
/// \param rate How much messages to produce in one second
29+
/// \param fill Should it fill messages with random data
30+
/// \param index SubSpecification of the data producer (useful when more than one needed)
31+
/// \return A random data producer specification
32+
framework::DataProcessorSpec getDataProducerSpec(size_t minSize, size_t maxSize, double rate, bool fill = true, size_t index = 0, std::string monitoringUrl = "");
33+
34+
/// \brief Returns an algorithm generating random messages
35+
///
36+
/// \param output Origin, Description and SubSpecification of data to be produced
37+
/// \param minSize Minimum size of a message in bytes
38+
/// \param maxSize Maximum size of a message in bytes
39+
/// \param rate How much messages to produce in one second
40+
/// \param fill Should it fill messages with random data
41+
/// \return A random data producer algorithm
42+
framework::AlgorithmSpec getDataProducerAlgorithm(framework::ConcreteDataMatcher output, size_t minSize, size_t maxSize, double rate, bool fill = true, std::string monitoringUrl = "");
43+
44+
} // namespace o2::quality_control::core
45+
46+
#endif //QUALITYCONTROL_DATAPRODUCER_H

Framework/src/DataProducer.cxx

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
11+
///
12+
/// \file DataProducer.cxx
13+
/// \author Piotr Konopka
14+
///
15+
#include "QualityControl/DataProducer.h"
16+
17+
#include <random>
18+
#include <Common/Timer.h>
19+
#include <Monitoring/MonitoringFactory.h>
20+
21+
using namespace o2::framework;
22+
using namespace o2::monitoring;
23+
24+
using SubSpec = o2::header::DataHeader::SubSpecificationType;
25+
using namespace AliceO2::Common;
26+
27+
namespace o2::quality_control::core
28+
{
29+
30+
DataProcessorSpec getDataProducerSpec(size_t minSize, size_t maxSize, double rate, bool fill, size_t index, std::string monitoringUrl)
31+
{
32+
return DataProcessorSpec{
33+
"producer-" + std::to_string(index),
34+
Inputs{},
35+
Outputs{
36+
{ { "out" }, "TST", "RAWDATA", static_cast<SubSpec>(index) } },
37+
getDataProducerAlgorithm({ "TST", "RAWDATA", static_cast<SubSpec>(index) }, minSize, maxSize, rate, fill, monitoringUrl)
38+
};
39+
}
40+
41+
framework::AlgorithmSpec
42+
getDataProducerAlgorithm(ConcreteDataMatcher output, size_t minSize, size_t maxSize, double rate, bool fill,
43+
std::string monitoringUrl)
44+
{
45+
return AlgorithmSpec{
46+
[=](InitContext&) {
47+
// this is the initialization code
48+
std::default_random_engine generator(time(nullptr));
49+
std::shared_ptr<Timer> timer = nullptr;
50+
51+
int messageCounter = 0;
52+
std::shared_ptr<monitoring::Monitoring> collector;
53+
if (!monitoringUrl.empty()) {
54+
collector = MonitoringFactory::Get(monitoringUrl);
55+
collector->enableProcessMonitoring();
56+
}
57+
58+
// after the initialization, we return the processing callback
59+
return [=](ProcessingContext& processingContext) mutable {
60+
// everything inside this lambda function is invoked in a loop, because it this Data Processor has no inputs
61+
62+
// setting up the timer
63+
if (!timer) {
64+
timer = std::make_shared<Timer>();
65+
timer->reset(static_cast<int>(1000000.0 / rate));
66+
}
67+
// keeping the message rate
68+
double timeToSleep = timer->getRemainingTime();
69+
if (timeToSleep > 0) {
70+
usleep(timeToSleep * 1000000.0);
71+
}
72+
timer->increment();
73+
74+
// generating data
75+
size_t length = minSize + (generator() % (maxSize - minSize));
76+
auto data = processingContext.outputs().make<char>({ output.origin, output.description, output.subSpec },
77+
length);
78+
if (fill) {
79+
for (auto&& item : data) {
80+
item = static_cast<char>(generator());
81+
}
82+
}
83+
84+
if (collector) {
85+
collector->send({ messageCounter++, "Data_producer_" + std::to_string(output.subSpec) + "_message_" },
86+
DerivedMetricMode::RATE);
87+
}
88+
};
89+
}
90+
};
91+
}
92+
93+
} // namespace o2::quality_control::core

Framework/src/runBasic.cxx

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ void customize(std::vector<ConfigParamSpec>& workflowOptions)
6262
ConfigParamSpec{ "no-data-sampling", VariantType::Bool, false, { "Skips data sampling, connects directly the task to the producer." } });
6363
}
6464

65-
#include <random>
6665
#include <string>
6766

6867
#include <Framework/runDataProcessing.h>
@@ -71,6 +70,7 @@ void customize(std::vector<ConfigParamSpec>& workflowOptions)
7170
#include "QualityControl/InfrastructureGenerator.h"
7271
#include "QualityControl/runnerUtils.h"
7372
#include "QualityControl/ExamplePrinterSpec.h"
73+
#include "QualityControl/DataProducer.h"
7474

7575
std::string getConfigPath(const ConfigContext& config);
7676

@@ -79,38 +79,12 @@ using namespace o2::framework;
7979
using namespace o2::quality_control::checker;
8080
using namespace std::chrono;
8181

82-
// clang-format off
8382
WorkflowSpec defineDataProcessing(const ConfigContext& config)
8483
{
8584
WorkflowSpec specs;
8685

8786
// The producer to generate some data in the workflow
88-
DataProcessorSpec producer{
89-
"producer",
90-
Inputs{},
91-
Outputs{
92-
{ "TST", "RAWDATA", 0 } },
93-
AlgorithmSpec{
94-
(AlgorithmSpec::InitCallback)[](InitContext&){
95-
// this is the initialization code
96-
std::default_random_engine generator(11);
97-
98-
// after the initialization, we return the processing callback
99-
return (AlgorithmSpec::ProcessCallback)[generator](ProcessingContext & processingContext) mutable
100-
{
101-
// everything inside this lambda function is invoked in a loop, because it this Data Processor has no inputs
102-
usleep(100000);
103-
104-
size_t length = generator() % 10000;
105-
auto data = processingContext.outputs().make<char>(Output{ "TST", "RAWDATA" }, length);
106-
for (auto&& item : data) {
107-
item = static_cast<char>(generator());
108-
}
109-
};
110-
}
111-
}
112-
};
113-
87+
DataProcessorSpec producer = getDataProducerSpec(1, 10000, 10);
11488
specs.push_back(producer);
11589

11690
// Path to the config file
@@ -135,7 +109,6 @@ WorkflowSpec defineDataProcessing(const ConfigContext& config)
135109

136110
return specs;
137111
}
138-
// clang-format on
139112

140113
// TODO merge this with the one from runReadout.cxx
141114
std::string getConfigPath(const ConfigContext& config)

Framework/src/runDataProducer.cxx

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,50 +19,51 @@
1919
/// \code{.sh}
2020
/// o2-qc-run-producer | o2-qc --config json://${QUALITYCONTROL_ROOT}/etc/basic.json
2121
/// \endcode
22+
/// Check out the help message to see how to configure data rate and message size.
2223
///
2324
/// If you have glfw installed, you should see a window with the workflow visualization and sub-windows for each Data
2425
/// Processor where their logs can be seen. The processing will continue until the main window it is closed. Regardless
2526
/// of glfw being installed or not, in the terminal all the logs will be shown as well.
2627

27-
#include <random>
28-
#include <Framework/runDataProcessing.h>
28+
#include <vector>
29+
#include <Framework/ConfigParamSpec.h>
2930

3031
using namespace o2;
3132
using namespace o2::framework;
3233

33-
// clang-format off
34-
WorkflowSpec defineDataProcessing(const ConfigContext&)
34+
void customize(std::vector<ConfigParamSpec>& workflowOptions)
3535
{
36-
WorkflowSpec specs;
36+
workflowOptions.push_back(
37+
ConfigParamSpec{ "min-size", VariantType::Int, 1, { "Minimum size in bytes of produced messages." } });
38+
workflowOptions.push_back(
39+
ConfigParamSpec{ "max-size", VariantType::Int, 10000, { "Maximum size in bytes of produced messages." } });
40+
workflowOptions.push_back(
41+
ConfigParamSpec{ "empty", VariantType::Bool, false, { "Don't fill messages with random data." } });
42+
workflowOptions.push_back(
43+
ConfigParamSpec{ "message-rate", VariantType::Double, 10.0, { "Rate of messages per second." } });
44+
workflowOptions.push_back(
45+
ConfigParamSpec{ "producers", VariantType::Int, 1, { "Number of producers. Each will have unique SubSpec, counting from 0." } });
46+
workflowOptions.push_back(
47+
ConfigParamSpec{ "monitoring-url", VariantType::String, "", { "URL of the Monitoring backend" } });
48+
}
3749

38-
// The producer to generate some data in the workflow
39-
DataProcessorSpec producer{
40-
"producer",
41-
Inputs{},
42-
Outputs{
43-
{ "TST", "RAWDATA", 0 } },
44-
AlgorithmSpec{
45-
(AlgorithmSpec::InitCallback)[](InitContext&){
46-
// this is the initialization code
47-
std::default_random_engine generator(11);
50+
#include <Framework/runDataProcessing.h>
51+
#include "QualityControl/DataProducer.h"
4852

49-
// after the initialization, we return the processing callback
50-
return (AlgorithmSpec::ProcessCallback)[generator](ProcessingContext & processingContext) mutable
51-
{
52-
// everything inside this lambda function is invoked in a loop, because it this Data Processor has no inputs
53-
usleep(100000);
53+
using namespace o2::quality_control::core;
5454

55-
size_t length = generator() % 10000;
56-
auto data = processingContext.outputs().make<char>(Output{ "TST", "RAWDATA" }, length);
57-
for (auto&& item : data) {
58-
item = static_cast<char>(generator());
59-
}
60-
};
61-
}
62-
}
63-
};
64-
specs.push_back(producer);
55+
WorkflowSpec defineDataProcessing(const ConfigContext& config)
56+
{
57+
size_t minSize = config.options().get<int>("min-size");
58+
size_t maxSize = config.options().get<int>("max-size");
59+
bool fill = !config.options().get<bool>("empty");
60+
double rate = config.options().get<double>("message-rate");
61+
size_t producers = config.options().get<int>("producers");
62+
std::string monitoringUrl = config.options().get<std::string>("monitoring-url");
6563

64+
WorkflowSpec specs;
65+
for (size_t i = 0; i < producers; i++) {
66+
specs.push_back(getDataProducerSpec(minSize, maxSize, rate, fill, i, monitoringUrl));
67+
}
6668
return specs;
67-
}
68-
// clang-format on
69+
}

0 commit comments

Comments
 (0)