Skip to content

Commit 6c14a67

Browse files
authored
[QC-260] Support EndOfStream in QC (#280)
* Fix data producer cases when minSize == maxSize * max number of messages parameter for data-producer * Finish cycle upon an EndOfStream signal
1 parent 826bd42 commit 6c14a67

5 files changed

Lines changed: 61 additions & 20 deletions

File tree

Framework/include/QualityControl/DataProducer.h

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,30 @@ namespace o2::quality_control::core
2626
/// \param minSize Minimum size of a message in bytes
2727
/// \param maxSize Maximum size of a message in bytes
2828
/// \param rate How much messages to produce in one second
29-
/// \param fill Should it fill messages with random data
29+
/// \param amount How many messages should be produce in total (0 for inf). EndOfStream is sent at the end.
3030
/// \param index SubSpecification of the data producer (useful when more than one needed)
31+
/// \param monitoringUrl Where monitoring metrics should be sent
32+
/// \param fill Should it fill messages with random data
33+
///
3134
/// \return A random data producer specification
32-
framework::DataProcessorSpec getDataProducerSpec(size_t minSize, size_t maxSize, double rate, bool fill = true, size_t index = 0, std::string monitoringUrl = "");
35+
framework::DataProcessorSpec
36+
getDataProducerSpec(size_t minSize, size_t maxSize, double rate, uint64_t amount = 0, size_t index = 0,
37+
std::string monitoringUrl = "", bool fill = true);
3338

3439
/// \brief Returns an algorithm generating random messages
3540
///
3641
/// \param output Origin, Description and SubSpecification of data to be produced
3742
/// \param minSize Minimum size of a message in bytes
3843
/// \param maxSize Maximum size of a message in bytes
3944
/// \param rate How much messages to produce in one second
45+
/// \param amount How many messages should be produce in total (0 for inf). EndOfStream is sent at the end.
46+
/// \param monitoringUrl Where monitoring metrics should be sent
4047
/// \param fill Should it fill messages with random data
48+
///
4149
/// \return A random data producer algorithm
42-
framework::AlgorithmSpec getDataProducerAlgorithm(framework::ConcreteDataMatcher output, size_t minSize, size_t maxSize, double rate, bool fill = true, std::string monitoringUrl = "");
50+
framework::AlgorithmSpec
51+
getDataProducerAlgorithm(framework::ConcreteDataMatcher output, size_t minSize, size_t maxSize, double rate,
52+
uint64_t amount = 0, std::string monitoringUrl = "", bool fill = true);
4353

4454
} // namespace o2::quality_control::core
4555

Framework/include/QualityControl/TaskRunner.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717
#ifndef QC_CORE_TASKRUNNER_H
1818
#define QC_CORE_TASKRUNNER_H
1919

20-
//#include <boost/accumulators/accumulators.hpp>
21-
//#include <boost/accumulators/statistics.hpp>
2220
// O2
2321
#include <Common/Timer.h>
2422
#include <Framework/Task.h>
2523
#include <Framework/DataProcessorSpec.h>
2624
#include <Framework/CompletionPolicy.h>
25+
#include <Framework/EndOfStreamContext.h>
2726
#include <Headers/DataHeader.h>
2827
// QC
2928
#include "QualityControl/TaskConfig.h"
@@ -98,6 +97,9 @@ class TaskRunner : public framework::Task
9897
/// \brief Unified DataDescription naming scheme for all tasks
9998
static header::DataDescription createTaskDataDescription(const std::string& taskName);
10099

100+
/// \brief Callback for CallbackService::Id::EndOfStream
101+
void endOfStream(framework::EndOfStreamContext& eosContext) override;
102+
101103
private:
102104
/// \brief Callback for CallbackService::Id::Start (DPL) a.k.a. RUN transition (FairMQ)
103105
void start();
@@ -132,6 +134,7 @@ class TaskRunner : public framework::Task
132134
framework::Options mOptions;
133135

134136
bool mCycleOn = false;
137+
bool mNoMoreCycles = false;
135138
int mCycleNumber = 0;
136139

137140
// stats

Framework/src/DataProducer.cxx

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
/// \author Piotr Konopka
1414
///
1515
#include "QualityControl/DataProducer.h"
16+
#include "QualityControl/QcInfoLogger.h"
1617

1718
#include <random>
1819
#include <Common/Timer.h>
1920
#include <Monitoring/MonitoringFactory.h>
21+
#include <Framework/ControlService.h>
2022

2123
using namespace o2::framework;
2224
using namespace o2::monitoring;
@@ -27,28 +29,29 @@ using namespace AliceO2::Common;
2729
namespace o2::quality_control::core
2830
{
2931

30-
DataProcessorSpec getDataProducerSpec(size_t minSize, size_t maxSize, double rate, bool fill, size_t index, std::string monitoringUrl)
32+
DataProcessorSpec getDataProducerSpec(size_t minSize, size_t maxSize, double rate, uint64_t amount, size_t index,
33+
std::string monitoringUrl, bool fill)
3134
{
3235
return DataProcessorSpec{
3336
"producer-" + std::to_string(index),
3437
Inputs{},
3538
Outputs{
3639
{ { "out" }, "TST", "RAWDATA", static_cast<SubSpec>(index) } },
37-
getDataProducerAlgorithm({ "TST", "RAWDATA", static_cast<SubSpec>(index) }, minSize, maxSize, rate, fill, monitoringUrl)
40+
getDataProducerAlgorithm({ "TST", "RAWDATA", static_cast<SubSpec>(index) }, minSize, maxSize, rate, amount,
41+
monitoringUrl, fill)
3842
};
3943
}
4044

41-
framework::AlgorithmSpec
42-
getDataProducerAlgorithm(ConcreteDataMatcher output, size_t minSize, size_t maxSize, double rate, bool fill,
43-
std::string monitoringUrl)
45+
AlgorithmSpec getDataProducerAlgorithm(ConcreteDataMatcher output, size_t minSize, size_t maxSize, double rate,
46+
uint64_t amount, std::string monitoringUrl, bool fill)
4447
{
4548
return AlgorithmSpec{
4649
[=](InitContext&) {
4750
// this is the initialization code
4851
std::default_random_engine generator(time(nullptr));
4952
std::shared_ptr<Timer> timer = nullptr;
5053

51-
int messageCounter = 0;
54+
uint64_t messageCounter = 0;
5255
std::shared_ptr<monitoring::Monitoring> collector;
5356
if (!monitoringUrl.empty()) {
5457
collector = MonitoringFactory::Get(monitoringUrl);
@@ -59,6 +62,14 @@ framework::AlgorithmSpec
5962
return [=](ProcessingContext& processingContext) mutable {
6063
// everything inside this lambda function is invoked in a loop, because it this Data Processor has no inputs
6164

65+
// checking if we have reached the maximum amount of messages
66+
if (amount != 0 && messageCounter >= amount) {
67+
ILOG(Info) << "Reached the maximum number of messages, requesting to quit the producer and sending an EndOfStream" << ENDM;
68+
processingContext.services().get<ControlService>().endOfStream();
69+
processingContext.services().get<ControlService>().readyToQuit(QuitRequest::Me);
70+
return;
71+
}
72+
6273
// setting up the timer
6374
if (!timer) {
6475
timer = std::make_shared<Timer>();
@@ -72,17 +83,19 @@ framework::AlgorithmSpec
7283
timer->increment();
7384

7485
// generating data
75-
size_t length = minSize + (generator() % (maxSize - minSize));
86+
size_t length = (minSize == maxSize) ? minSize : (minSize + (generator() % (maxSize - minSize)));
7687
auto data = processingContext.outputs().make<char>({ output.origin, output.description, output.subSpec },
7788
length);
89+
++messageCounter;
7890
if (fill) {
7991
for (auto&& item : data) {
8092
item = static_cast<char>(generator());
8193
}
8294
}
8395

96+
// send metrics
8497
if (collector) {
85-
collector->send({ messageCounter++, "Data_producer_" + std::to_string(output.subSpec) + "_message_" },
98+
collector->send({ messageCounter, "Data_producer_" + std::to_string(output.subSpec) + "_message_" },
8699
DerivedMetricMode::RATE);
87100
}
88101
};

Framework/src/TaskRunner.cxx

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,16 @@ void TaskRunner::init(InitContext& iCtx)
8080
// init user's task
8181
mTask->loadCcdb(mTaskConfig.conditionUrl);
8282
mTask->initialize(iCtx);
83+
84+
mNoMoreCycles = false;
85+
mCycleNumber = 0;
8386
}
8487

8588
void TaskRunner::run(ProcessingContext& pCtx)
8689
{
87-
if (mTaskConfig.maxNumberCycles >= 0 && mCycleNumber >= mTaskConfig.maxNumberCycles) {
88-
LOG(INFO) << "The maximum number of cycles (" << mTaskConfig.maxNumberCycles << ") has been reached.";
90+
if (mNoMoreCycles) {
91+
ILOG(Info) << "The maximum number of cycles (" << mTaskConfig.maxNumberCycles << ") has been reached"
92+
<< " or the device has received an EndOfStream signal. Won't start a new cycle." << ENDM;
8993
return;
9094
}
9195

@@ -107,6 +111,8 @@ void TaskRunner::run(ProcessingContext& pCtx)
107111
}
108112
if (mTaskConfig.maxNumberCycles < 0 || mCycleNumber < mTaskConfig.maxNumberCycles) {
109113
startCycle();
114+
} else {
115+
mNoMoreCycles = true;
110116
}
111117
}
112118
}
@@ -117,7 +123,6 @@ CompletionPolicy::CompletionOp TaskRunner::completionPolicyCallback(o2::framewor
117123
// added, this will break.
118124
size_t dataInputsExpected = inputs.size() - 1;
119125
size_t dataInputsPresent = 0;
120-
size_t allInputs = 0;
121126

122127
CompletionPolicy::CompletionOp action = CompletionPolicy::CompletionOp::Wait;
123128

@@ -138,7 +143,6 @@ CompletionPolicy::CompletionOp TaskRunner::completionPolicyCallback(o2::framewor
138143

139144
LOG(DEBUG) << "Completion policy callback. "
140145
<< "Total inputs possible: " << inputs.size()
141-
<< ", inputs present: " << allInputs
142146
<< ", data inputs: " << dataInputsPresent
143147
<< ", timer inputs: " << (action == CompletionPolicy::CompletionOp::Consume);
144148

@@ -173,12 +177,20 @@ header::DataDescription TaskRunner::createTaskDataDescription(const std::string&
173177
return description;
174178
}
175179

180+
void TaskRunner::endOfStream(framework::EndOfStreamContext& eosContext)
181+
{
182+
ILOG(Info) << "Received an EndOfStream, finishing the current cycle" << ENDM;
183+
finishCycle(eosContext.outputs());
184+
mNoMoreCycles = true;
185+
}
186+
176187
void TaskRunner::start()
177188
{
178189
startOfActivity();
179190

180-
if (mTaskConfig.maxNumberCycles >= 0 && mCycleNumber >= mTaskConfig.maxNumberCycles) {
181-
LOG(INFO) << "The maximum number of cycles (" << mTaskConfig.maxNumberCycles << ") has been reached.";
191+
if (mNoMoreCycles) {
192+
ILOG(Info) << "The maximum number of cycles (" << mTaskConfig.maxNumberCycles << ") has been reached"
193+
<< " or the device has received an EndOfStream signal. Won't start a new cycle." << ENDM;
182194
return;
183195
}
184196

Framework/src/runDataProducer.cxx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ void customize(std::vector<ConfigParamSpec>& workflowOptions)
4141
ConfigParamSpec{ "empty", VariantType::Bool, false, { "Don't fill messages with random data." } });
4242
workflowOptions.push_back(
4343
ConfigParamSpec{ "message-rate", VariantType::Double, 10.0, { "Rate of messages per second." } });
44+
workflowOptions.push_back(
45+
ConfigParamSpec{ "message-amount", VariantType::Int, 0, { "Amount of messages to be produced in total (0 for inf)." } });
4446
workflowOptions.push_back(
4547
ConfigParamSpec{ "producers", VariantType::Int, 1, { "Number of producers. Each will have unique SubSpec, counting from 0." } });
4648
workflowOptions.push_back(
@@ -58,12 +60,13 @@ WorkflowSpec defineDataProcessing(const ConfigContext& config)
5860
size_t maxSize = config.options().get<int>("max-size");
5961
bool fill = !config.options().get<bool>("empty");
6062
double rate = config.options().get<double>("message-rate");
63+
uint64_t amount = config.options().get<int>("message-amount");
6164
size_t producers = config.options().get<int>("producers");
6265
std::string monitoringUrl = config.options().get<std::string>("monitoring-url");
6366

6467
WorkflowSpec specs;
6568
for (size_t i = 0; i < producers; i++) {
66-
specs.push_back(getDataProducerSpec(minSize, maxSize, rate, fill, i, monitoringUrl));
69+
specs.push_back(getDataProducerSpec(minSize, maxSize, rate, amount, i, monitoringUrl, fill));
6770
}
6871
return specs;
6972
}

0 commit comments

Comments
 (0)