Skip to content

Commit 1d7f8b8

Browse files
authored
[QC-1010] Make activity available in aggregator (#1953)
* [QC-1010] Aggregator -> add activity * doc * Update Advanced.md * actually use the setActivity and switch to a shared pointer (as it is done in the Checks) * format * make mActivity private
1 parent 96e3019 commit 1d7f8b8

6 files changed

Lines changed: 34 additions & 11 deletions

File tree

Framework/include/QualityControl/Aggregator.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ class Aggregator
7575
std::vector<AggregatorSource> getSources() const;
7676
std::vector<AggregatorSource> getSources(core::DataSourceType type);
7777
const std::string& getDetector() const { return mAggregatorConfig.detectorName; };
78+
void setActivity(std::shared_ptr<core::Activity> activity);
7879

7980
static AggregatorConfig extractConfig(const core::CommonSpec&, const AggregatorSpec&);
8081

Framework/include/QualityControl/AggregatorInterface.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,13 @@ class AggregatorInterface : public o2::quality_control::core::UserCodeInterface
4444
/// @return The new qualities, associated with a name.
4545
virtual std::map<std::string, o2::quality_control::core::Quality> aggregate(std::map<std::string, std::shared_ptr<const o2::quality_control::core::QualityObject>>& qoMap) = 0;
4646

47-
protected:
48-
ClassDef(AggregatorInterface, 2)
47+
void setActivity(std::shared_ptr<core::Activity> activity) { mActivity = activity; }
48+
std::shared_ptr<const core::Activity> getActivity() const { return mActivity; }
49+
50+
private:
51+
std::shared_ptr<core::Activity> mActivity; // TODO should probably go to UserCodeInterface
52+
53+
ClassDef(AggregatorInterface, 3)
4954
};
5055

5156
} // namespace o2::quality_control::checker

Framework/include/QualityControl/AggregatorRunner.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "QualityControl/Activity.h"
3030
#include "QualityControl/AggregatorRunnerConfig.h"
3131
#include "QualityControl/AggregatorConfig.h"
32+
#include "QualityControl/Activity.h"
3233

3334
namespace o2::framework
3435
{
@@ -175,7 +176,7 @@ class AggregatorRunner : public framework::Task
175176

176177
// General state
177178
std::string mDeviceName;
178-
core::Activity mActivity;
179+
std::shared_ptr<core::Activity> mActivity; // shareable with the Aggregators
179180
std::vector<std::shared_ptr<Aggregator>> mAggregators;
180181
std::shared_ptr<o2::quality_control::repository::DatabaseInterface> mDatabase;
181182
AggregatorRunnerConfig mRunnerConfig;

Framework/src/Aggregator.cxx

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,4 +224,13 @@ AggregatorConfig Aggregator::extractConfig(const core::CommonSpec& commonSpec, c
224224
};
225225
}
226226

227+
void Aggregator::setActivity(std::shared_ptr<core::Activity> activity)
228+
{
229+
if (mAggregatorInterface) {
230+
mAggregatorInterface->setActivity(std::move(activity));
231+
} else {
232+
throw std::runtime_error("Trying to set Activity on an empty AggregatorInterface '" + mAggregatorConfig.name + "'");
233+
}
234+
}
235+
227236
} // namespace o2::quality_control::checker

Framework/src/AggregatorRunner.cxx

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ QualityObjectsType AggregatorRunner::aggregate()
214214

215215
if (updatePolicyManager.isReady(aggregatorName)) {
216216
ILOG(Info, Devel) << " Quality Objects for the aggregator '" << aggregatorName << "' are ready, aggregating" << ENDM;
217-
auto newQOs = aggregator->aggregate(mQualityObjects, mActivity); // we give the whole list
217+
auto newQOs = aggregator->aggregate(mQualityObjects, *mActivity); // we give the whole list
218218
mTotalNumberObjectsProduced += newQOs.size();
219219
mTotalNumberAggregatorExecuted++;
220220
// we consider the output of the aggregators the same way we do the output of a check
@@ -421,17 +421,20 @@ void AggregatorRunner::sendPeriodicMonitoring()
421421

422422
void AggregatorRunner::start(ServiceRegistryRef services)
423423
{
424-
mActivity = computeActivity(services, mRunnerConfig.fallbackActivity);
424+
mActivity = std::make_shared<Activity>(computeActivity(services, mRunnerConfig.fallbackActivity));
425425
mTimerTotalDurationActivity.reset();
426-
QcInfoLogger::setRun(mActivity.mId);
427-
QcInfoLogger::setPartition(mActivity.mPartitionName);
428-
ILOG(Info, Support) << "Starting run " << mActivity.mId << ENDM;
426+
QcInfoLogger::setRun(mActivity->mId);
427+
QcInfoLogger::setPartition(mActivity->mPartitionName);
428+
ILOG(Info, Support) << "Starting run " << mActivity->mId << ENDM;
429+
for (auto& aggregator : mAggregators) {
430+
aggregator->setActivity(mActivity);
431+
}
429432

430433
// register ourselves to the BK
431434
if (gSystem->Getenv("O2_QC_REGISTER_IN_BK")) { // until we are sure it works, we have to turn it on
432435
ILOG(Debug, Devel) << "Registering aggregator to BookKeeping" << ENDM;
433436
try {
434-
Bookkeeping::getInstance().registerProcess(mActivity.mId, mDeviceName, AggregatorRunner::getDetectorName(mAggregators), bookkeeping::DPL_PROCESS_TYPE_QC_AGGREGATOR, "");
437+
Bookkeeping::getInstance().registerProcess(mActivity->mId, mDeviceName, AggregatorRunner::getDetectorName(mAggregators), bookkeeping::DPL_PROCESS_TYPE_QC_AGGREGATOR, "");
435438
} catch (std::runtime_error& error) {
436439
ILOG(Warning, Devel) << "Failed registration to the BookKeeping: " << error.what() << ENDM;
437440
}
@@ -440,7 +443,7 @@ void AggregatorRunner::start(ServiceRegistryRef services)
440443

441444
void AggregatorRunner::stop()
442445
{
443-
ILOG(Info, Support) << "Stopping run " << mActivity.mId << ENDM;
446+
ILOG(Info, Support) << "Stopping run " << mActivity->mId << ENDM;
444447
}
445448

446449
void AggregatorRunner::reset()
@@ -449,7 +452,7 @@ void AggregatorRunner::reset()
449452

450453
try {
451454
mCollector.reset();
452-
mActivity = Activity();
455+
mActivity = make_shared<Activity>();
453456
} catch (...) {
454457
// we catch here because we don't know where it will go in DPL's CallbackService
455458
ILOG(Error, Support) << "Error caught in reset() : "

doc/Advanced.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1129,6 +1129,10 @@ In a task, the `activity` is provided in `startOfActivity`.
11291129

11301130
In a Check, it is returned by `getActivity()`.
11311131

1132+
In an Aggregator, it is returned by `getActivity()`.
1133+
1134+
In a postprocessing task, it is available in the objects manager: `getObjectsManager()->getActivity()`
1135+
11321136
## Definition of new arguments
11331137

11341138
One can also tell the DPL driver to accept new arguments. This is done using the `customize` method at the top of your workflow definition (usually called "runXXX" in the QC).

0 commit comments

Comments
 (0)