Skip to content

Commit a9056e9

Browse files
Syncing to new DPL CompletionPolicy API after O2 PR 2886 (#304)
Pull request AliceO2Group/AliceO2#2886 changes the argument of the policy callback from span<PartRef> to span<DataRef> which is a span of non-message-owning reference objects. In order to make depending packages easier to implement, the callback signature is now defined in CompletionPolicy.h header in AliceO2Group/AliceO2#2918, thee definitions are now used in the TaskRunner.
1 parent 4127fa8 commit a9056e9

3 files changed

Lines changed: 7 additions & 14 deletions

File tree

Framework/include/QualityControl/TaskRunner.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class TaskRunner : public framework::Task
8282
void run(framework::ProcessingContext& pCtx) override;
8383

8484
/// \brief TaskRunner's completion policy callback
85-
static framework::CompletionPolicy::CompletionOp completionPolicyCallback(gsl::span<framework::PartRef const> const& inputs);
85+
static framework::CompletionPolicy::CompletionOp completionPolicyCallback(o2::framework::CompletionPolicy::InputSet inputs);
8686

8787
std::string getDeviceName() { return mDeviceName; };
8888
const framework::Inputs& getInputsSpecs() { return mInputSpecs; };

Framework/src/TaskRunner.cxx

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <Monitoring/MonitoringFactory.h>
2525
#include <Framework/DataSampling.h>
2626
#include <Framework/CallbackService.h>
27+
#include <Framework/CompletionPolicyHelpers.h>
2728
#include <Framework/TimesliceIndex.h>
2829
#include <Framework/DataSpecUtils.h>
2930
#include <Framework/DataDescriptorQueryBuilder.h>
@@ -110,7 +111,7 @@ void TaskRunner::run(ProcessingContext& pCtx)
110111
}
111112
}
112113

113-
CompletionPolicy::CompletionOp TaskRunner::completionPolicyCallback(gsl::span<PartRef const> const& inputs)
114+
CompletionPolicy::CompletionOp TaskRunner::completionPolicyCallback(o2::framework::CompletionPolicy::InputSet inputs)
114115
{
115116
// fixme: we assume that there is one timer input and the rest are data inputs. If some other implicit inputs are
116117
// added, this will break.
@@ -125,7 +126,7 @@ CompletionPolicy::CompletionOp TaskRunner::completionPolicyCallback(gsl::span<Pa
125126
continue;
126127
}
127128

128-
const auto* dataHeader = get<DataHeader*>(input.header.get()->GetData());
129+
const auto* dataHeader = CompletionPolicyHelpers::getHeader<DataHeader>(input);
129130
assert(dataHeader);
130131

131132
if (!strncmp(dataHeader->dataDescription.str, "TIMER", 5)) {

Framework/src/runAdvanced.cxx

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include <Framework/CompletionPolicyHelpers.h>
3535
#include <Framework/DataSampling.h>
3636
#include <Framework/DataSpecUtils.h>
37+
#include <Framework/CompletionPolicyHelpers.h>
3738
#include "QualityControl/InfrastructureGenerator.h"
3839

3940
using namespace o2;
@@ -48,16 +49,7 @@ void customize(std::vector<CompletionPolicy>& policies)
4849
{
4950
DataSampling::CustomizeInfrastructure(policies);
5051
quality_control::customizeInfrastructure(policies);
51-
CompletionPolicy mergerConsumesASAP{
52-
"mergers-always-consume",
53-
[](DeviceSpec const& device) {
54-
return device.name.find("merger") != std::string::npos;
55-
},
56-
[](gsl::span<PartRef const> const& /*inputs*/) {
57-
return CompletionPolicy::CompletionOp::Consume;
58-
}
59-
};
60-
policies.push_back(mergerConsumesASAP);
52+
policies.push_back(CompletionPolicyHelpers::defineByName(".*merger.*", CompletionPolicy::CompletionOp::Consume));
6153
}
6254

6355
void customize(std::vector<ChannelConfigurationPolicy>& policies)
@@ -169,4 +161,4 @@ WorkflowSpec defineDataProcessing(ConfigContext const&)
169161
quality_control::generateRemoteInfrastructure(specs, qcConfigurationSource);
170162

171163
return specs;
172-
}
164+
}

0 commit comments

Comments
 (0)