Skip to content

Commit ddadadb

Browse files
authored
[QC-114] Use a container to send all MOs at once (#93)
* Use container to send all MOs at once * Send TObjArray instead of MOs in Checker
1 parent 150d894 commit ddadadb

9 files changed

Lines changed: 79 additions & 70 deletions

File tree

Framework/include/QualityControl/Checker.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class Checker : public framework::Task
8282
/**
8383
* \brief Send the MonitorObject on FairMQ to whoever is listening.
8484
*/
85-
void send(std::shared_ptr<MonitorObject> mo, framework::DataAllocator& allocator);
85+
void send(std::unique_ptr<TObjArray>& mo, framework::DataAllocator& allocator);
8686

8787
/**
8888
* \brief Load a library.

Framework/include/QualityControl/HistoMerger.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class HistoMerger : public framework::Task
5656
private:
5757
// General state
5858
std::string mMergerName;
59-
std::shared_ptr<MonitorObject> mMonitorObject;
59+
TObjArray mMergedArray;
6060
AliceO2::Common::Timer mPublicationTimer;
6161

6262
// DPL

Framework/include/QualityControl/ObjectsManager.h

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include "QualityControl/MonitorObject.h"
1010
#include "QualityControl/Quality.h"
1111
#include "QualityControl/TaskConfig.h"
12+
#include <TObjArray.h>
1213
#include <TObjString.h>
1314
#include <boost/concept_check.hpp>
1415
#include <string>
@@ -68,22 +69,15 @@ class ObjectsManager
6869

6970
TObject* getObject(std::string objectName);
7071

71-
typedef typename std::map<std::string, quality_control::core::MonitorObject*>::iterator iterator;
72-
typedef typename std::map<std::string, quality_control::core::MonitorObject*>::const_iterator const_iterator;
73-
74-
const_iterator begin() const { return mMonitorObjects.begin(); }
75-
const_iterator end() const { return mMonitorObjects.end(); }
76-
77-
iterator begin() { return mMonitorObjects.begin(); }
78-
iterator end() { return mMonitorObjects.end(); }
72+
TObjArray* getNonOwningArray() const { return new TObjArray(mMonitorObjects); };
7973

8074
std::string getObjectsListString() { return mObjectsList.GetString().Data(); }
8175

8276
private:
8377
void UpdateIndex(const std::string& nonEmptyName);
8478

8579
private:
86-
std::map<std::string /*object name*/, quality_control::core::MonitorObject* /* object */> mMonitorObjects;
80+
TObjArray mMonitorObjects;
8781
std::string mTaskName;
8882
// todo make it a vector of string when support added
8983
TObjString mObjectsList; // the list of objects we publish. (comma separated)

Framework/src/Checker.cxx

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include <Common/Exceptions.h>
1515
#include <Configuration/ConfigurationFactory.h>
1616
#include <Framework/DataRefUtils.h>
17+
#include <TMap.h>
1718
// QC
1819
#include "QualityControl/DatabaseFactory.h"
1920
#include "QualityControl/TaskRunner.h"
@@ -101,20 +102,26 @@ void Checker::run(framework::ProcessingContext& ctx)
101102
startFirstObject = system_clock::now();
102103
}
103104

104-
for (auto&& input : ctx.inputs()) {
105-
// will that work properly with shmem?
106-
std::shared_ptr<MonitorObject> mo{ std::move(framework::DataRefUtils::as<MonitorObject>(input)) };
105+
std::shared_ptr<TObjArray> moArray{ std::move(framework::DataRefUtils::as<TObjArray>(*ctx.inputs().begin())) };
106+
moArray->SetOwner(false);
107+
auto checkedMoArray = std::make_unique<TObjArray>();
108+
checkedMoArray->SetOwner();
107109

110+
for (const auto& to : *moArray) {
111+
std::shared_ptr<MonitorObject> mo{dynamic_cast<MonitorObject*>(to)};
112+
moArray->RemoveFirst();
108113
if (mo) {
109114
check(mo);
110115
store(mo);
111-
send(mo, ctx.outputs());
112116
mTotalNumberHistosReceived++;
117+
checkedMoArray->Add(new MonitorObject(*mo));
113118
} else {
114119
mLogger << "the mo is null" << AliceO2::InfoLogger::InfoLogger::endm;
115120
}
116121
}
117122

123+
send(checkedMoArray, ctx.outputs());
124+
118125
// monitoring
119126
endLastObject = system_clock::now();
120127

@@ -169,13 +176,12 @@ void Checker::store(std::shared_ptr<MonitorObject> mo)
169176
}
170177
}
171178

172-
void Checker::send(std::shared_ptr<MonitorObject> mo, framework::DataAllocator& allocator)
179+
void Checker::send(std::unique_ptr<TObjArray>& moArray, framework::DataAllocator& allocator)
173180
{
174-
mLogger << "Sending \"" << mo->getName() << "\"" << AliceO2::InfoLogger::InfoLogger::endm;
175-
176-
// todo: consider adopting
177-
allocator.snapshot<MonitorObject>(
178-
framework::Output{ mOutputSpec.origin, mOutputSpec.description, mOutputSpec.subSpec, mOutputSpec.lifetime }, *mo);
181+
mLogger << "Sending Monitor Object array with " << moArray->GetEntries() << " objects inside." << AliceO2::InfoLogger::InfoLogger::endm;
182+
183+
allocator.adopt(
184+
framework::Output{ mOutputSpec.origin, mOutputSpec.description, mOutputSpec.subSpec, mOutputSpec.lifetime }, moArray.release());
179185
}
180186

181187
void Checker::loadLibrary(const std::string libraryName)

Framework/src/HistoMerger.cxx

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include "QualityControl/HistoMerger.h"
77

88
#include <Framework/DataRefUtils.h>
9+
#include <TObjArray.h>
910

1011
namespace o2
1112
{
@@ -22,33 +23,42 @@ HistoMerger::HistoMerger(std::string mergerName, double publicationPeriodSeconds
2223
: mMergerName(mergerName), mOutputSpec{ header::gDataOriginInvalid, header::gDataDescriptionInvalid }
2324
{
2425
mPublicationTimer.reset(static_cast<int>(publicationPeriodSeconds * 1000000));
26+
mMergedArray.SetOwner(true);
2527
}
2628

2729
HistoMerger::~HistoMerger() {}
2830

29-
void HistoMerger::init(framework::InitContext& ctx) { mMonitorObject.reset(); }
31+
void HistoMerger::init(framework::InitContext&) { mMergedArray.Clear(); }
3032

3133
void HistoMerger::run(framework::ProcessingContext& ctx)
3234
{
3335
for (const auto& input : ctx.inputs()) {
34-
if (input.header != nullptr && input.spec != nullptr &&
35-
std::strstr(DataRefUtils::as<MonitorObject>(input)->getObject()->ClassName(), "TH1") != nullptr) {
36+
if (input.header != nullptr && input.spec != nullptr) {
37+
std::unique_ptr<TObjArray> moArray = DataRefUtils::as<TObjArray>(input);
3638

37-
if (!mMonitorObject) {
38-
mMonitorObject.reset(DataRefUtils::as<MonitorObject>(input).release());
39+
if (mMergedArray.IsEmpty()) {
40+
mMergedArray = *moArray.release();
3941
} else {
40-
TH1* h = dynamic_cast<TH1*>(mMonitorObject->getObject());
41-
const TH1* hUpdate = dynamic_cast<TH1*>(DataRefUtils::as<MonitorObject>(input)->getObject());
42-
h->Add(hUpdate);
42+
if (mMergedArray.GetSize() != moArray->GetSize()) {
43+
LOG(ERROR) << "array don't match in size, " << mMergedArray.GetSize() << " vs " << moArray->GetSize();
44+
return;
45+
}
46+
47+
for (int i = 0; i < mMergedArray.GetEntries(); i++) {
48+
MonitorObject* mo = dynamic_cast<MonitorObject*>((*moArray)[i]);
49+
if (mo && std::strstr(mo->getObject()->ClassName(), "TH1") != nullptr) {
50+
TH1* h = dynamic_cast<TH1*>(dynamic_cast<MonitorObject*>(mMergedArray[i])->getObject());
51+
const TH1* hUpdate = dynamic_cast<TH1*>(mo->getObject());
52+
h->Add(hUpdate);
53+
}
54+
}
4355
}
4456
}
4557
}
4658
if (mPublicationTimer.isTimeout()) {
47-
if (mMonitorObject) {
48-
ctx.outputs().snapshot<MonitorObject>(Output{ mOutputSpec.origin, mOutputSpec.description, mOutputSpec.subSpec },
49-
*mMonitorObject);
59+
if (!mMergedArray.IsEmpty()) {
60+
ctx.outputs().snapshot(Output{ mOutputSpec.origin, mOutputSpec.description, mOutputSpec.subSpec }, mMergedArray);
5061
}
51-
5262
// avoid publishing mo many times consecutively because of too long initial waiting time
5363
do {
5464
mPublicationTimer.increment();

Framework/src/ObjectsManager.cxx

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,22 +31,19 @@ namespace core
3131
ObjectsManager::ObjectsManager(TaskConfig& taskConfig) : mTaskName(taskConfig.taskName)
3232
{
3333
startPublishing(&mObjectsList, MonitorObject::SYSTEM_OBJECT_PUBLICATION_LIST);
34+
mMonitorObjects.SetOwner(true);
3435
}
3536

3637
ObjectsManager::~ObjectsManager()
3738
{
38-
for (auto& mMonitorObject : mMonitorObjects) {
39-
delete mMonitorObject.second;
40-
}
41-
mMonitorObjects.clear();
4239
}
4340

4441
void ObjectsManager::startPublishing(TObject* object, std::string objectName)
4542
{
4643
std::string nonEmptyName = objectName.empty() ? object->GetName() : objectName;
4744
auto* newObject = new MonitorObject(nonEmptyName, object, mTaskName);
4845
newObject->setIsOwner(false);
49-
mMonitorObjects[nonEmptyName] = newObject;
46+
mMonitorObjects.Add(newObject);
5047

5148
// update index
5249
if (objectName != MonitorObject::SYSTEM_OBJECT_PUBLICATION_LIST) {
@@ -83,8 +80,10 @@ void ObjectsManager::addCheck(const std::string& objectName, const std::string&
8380

8481
MonitorObject* ObjectsManager::getMonitorObject(std::string objectName)
8582
{
86-
if (mMonitorObjects.count(objectName) > 0) {
87-
return mMonitorObjects[objectName];
83+
TObject* mo = mMonitorObjects.FindObject(objectName.c_str());
84+
85+
if (mo) {
86+
return dynamic_cast<MonitorObject*>(mo);
8887
} else {
8988
BOOST_THROW_EXCEPTION(ObjectNotFoundError() << errinfo_object_name(objectName));
9089
}

Framework/src/TaskRunner.cxx

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ TaskRunner::TaskRunner(std::string taskName, std::string configurationSource, si
4747
mLastNumberObjects(0),
4848
mCycleOn(false),
4949
mCycleNumber(0),
50-
mMonitorObjectsSpec(createTaskDataOrigin(), createTaskDataDescription(taskName), id),
50+
mMonitorObjectsSpec({"mo"}, createTaskDataOrigin(), createTaskDataDescription(taskName), id),
5151
mResetAfterPublish(false),
5252
mTask(nullptr)
5353
{
@@ -269,20 +269,15 @@ void TaskRunner::finishCycle(DataAllocator& outputs)
269269

270270
unsigned long TaskRunner::publish(DataAllocator& outputs)
271271
{
272-
unsigned int sentMessages = 0;
273-
274-
for (auto& pair : *mObjectsManager) {
275-
276-
auto* mo = pair.second;
277-
outputs.snapshot<MonitorObject>(Output{ mMonitorObjectsSpec.origin, mMonitorObjectsSpec.description,
278-
mMonitorObjectsSpec.subSpec, mMonitorObjectsSpec.lifetime },
279-
*mo);
280-
281-
QcInfoLogger::GetInstance() << "Sending \"" << mo->getName() << "\"" << AliceO2::InfoLogger::InfoLogger::endm;
282-
sentMessages++;
283-
}
284-
285-
return sentMessages;
272+
outputs.adopt(
273+
Output{ mMonitorObjectsSpec.origin,
274+
mMonitorObjectsSpec.description,
275+
mMonitorObjectsSpec.subSpec,
276+
mMonitorObjectsSpec.lifetime },
277+
dynamic_cast<TObject*>(mObjectsManager->getNonOwningArray())
278+
);
279+
280+
return 1;
286281
}
287282

288283
void TaskRunner::CustomCleanupTMessage(void* data, void* object) { delete (TMessage*)object; }

Framework/src/runBasic.cxx

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,15 +108,19 @@ WorkflowSpec defineDataProcessing(ConfigContext const&)
108108

109109
return (AlgorithmSpec::ProcessCallback) [](ProcessingContext& processingContext) mutable {
110110
LOG(INFO) << "printer invoked";
111-
auto mo = processingContext.inputs().get<MonitorObject*>("checked-mo").get();
111+
std::shared_ptr<TObjArray> moArray{ std::move(DataRefUtils::as<TObjArray>(*processingContext.inputs().begin())) };
112112

113-
if (mo->getName() == "example") {
114-
auto* g = dynamic_cast<TH1F*>(mo->getObject());
115-
std::string bins = "BINS:";
116-
for (int i = 0; i < g->GetNbinsX(); i++) {
117-
bins += " " + std::to_string((int) g->GetBinContent(i));
113+
for (const auto& to : *moArray) {
114+
MonitorObject* mo = dynamic_cast<MonitorObject*>(to);
115+
116+
if (mo->getName() == "example") {
117+
auto* g = dynamic_cast<TH1F*>(mo->getObject());
118+
std::string bins = "BINS:";
119+
for (int i = 0; i < g->GetNbinsX(); i++) {
120+
bins += " " + std::to_string((int) g->GetBinContent(i));
121+
}
122+
LOG(INFO) << bins;
118123
}
119-
LOG(INFO) << bins;
120124
}
121125
};
122126
}

Framework/src/runMergerTest.cxx

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,13 @@ WorkflowSpec defineDataProcessing(ConfigContext const&)
6161
histo->Fill(p/(double)producersAmount);
6262

6363
MonitorObject* mo = new MonitorObject("histo", histo, "histo-task");
64-
mo->setIsOwner(false);
64+
mo->setIsOwner(true);
6565

66-
processingContext.outputs().snapshot<MonitorObject>(Output{"TST", "HISTO", p + 1}, *mo);
67-
// processingContext.outputs().snapshot<MonitorObject>(OutputRef{ "mo", p + 1 }, *mo); // dpl template bug?
68-
delete mo;
69-
delete histo;
66+
TObjArray* array = new TObjArray;
67+
array->SetOwner(true);
68+
array->Add(mo);
69+
70+
processingContext.outputs().adopt(Output{"TST", "HISTO", p + 1}, array);
7071
}
7172
}
7273
};
@@ -87,15 +88,15 @@ WorkflowSpec defineDataProcessing(ConfigContext const&)
8788
DataProcessorSpec printer{
8889
"printer",
8990
Inputs{
90-
{ "mo", "TST", "HISTO", 0 }
91+
{ "moarray", "TST", "HISTO", 0 }
9192
},
9293
Outputs{},
9394
AlgorithmSpec{
9495
(AlgorithmSpec::InitCallback) [](InitContext&) {
95-
9696
return (AlgorithmSpec::ProcessCallback) [](ProcessingContext& processingContext) mutable {
9797
LOG(INFO) << "printer invoked";
98-
auto mo = processingContext.inputs().get<MonitorObject*>("mo").get();
98+
auto moArray = processingContext.inputs().get<TObjArray*>("moarray");
99+
auto mo = dynamic_cast<MonitorObject*>(moArray->First());
99100

100101
if (mo->getName() == "histo") {
101102
auto* g = dynamic_cast<TH1F*>(mo->getObject());

0 commit comments

Comments
 (0)