Skip to content

Commit 95a36c5

Browse files
committed
DPL: add support for writing general Metadata to AO2D
1 parent 91c2ccd commit 95a36c5

8 files changed

Lines changed: 125 additions & 18 deletions

File tree

Framework/AnalysisSupport/src/AODWriterHelpers.cxx

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "Framework/EndOfStreamContext.h"
1717
#include "Framework/ProcessingContext.h"
1818
#include "Framework/InitContext.h"
19+
#include "Framework/Output.h"
1920
#include "Framework/CallbackService.h"
2021
#include "Framework/AnalysisSupportHelpers.h"
2122
#include "Framework/TableConsumer.h"
@@ -31,6 +32,10 @@
3132
#include <TMap.h>
3233
#include <TObjString.h>
3334
#include <arrow/table.h>
35+
#include <algorithm>
36+
#include <string>
37+
#include <utility>
38+
#include <vector>
3439

3540
O2_DECLARE_DYNAMIC_LOG(histogram_registry);
3641

@@ -477,4 +482,46 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& /*ct
477482
};
478483
}};
479484
}
485+
486+
AlgorithmSpec AODWriterHelpers::getMetadataCollector(ConfigContext const& /*ctx*/)
487+
{
488+
return AlgorithmSpec{[](InitContext&) -> std::function<void(ProcessingContext&)> {
489+
// Accumulated metadata, last writer wins per key.
490+
auto merged = std::make_shared<std::vector<std::pair<std::string, std::string>>>();
491+
return [merged](ProcessingContext& pc) -> void {
492+
auto nParts = pc.inputs().getNofParts(0);
493+
for (auto pi = 0U; pi < nParts; ++pi) {
494+
auto part = pc.inputs().get<TMap*>("meta", pi);
495+
if (!part) {
496+
continue;
497+
}
498+
TIter next(part.get());
499+
while (TObject* key = next()) {
500+
TObject* value = part->GetValue(key);
501+
std::string k = key->GetName();
502+
std::string v = value != nullptr ? value->GetName() : "";
503+
auto it = std::find_if(merged->begin(), merged->end(),
504+
[&k](auto const& e) { return e.first == k; });
505+
if (it != merged->end()) {
506+
it->second = std::move(v);
507+
} else {
508+
merged->emplace_back(std::move(k), std::move(v));
509+
}
510+
}
511+
}
512+
// Emit the keys/vals vectors the AOD writer already turns into the AO2D
513+
// metaData TMap, so no special handling is needed there.
514+
std::vector<TString> keys, vals;
515+
keys.reserve(merged->size());
516+
vals.reserve(merged->size());
517+
for (auto const& [k, v] : *merged) {
518+
keys.emplace_back(k);
519+
vals.emplace_back(v);
520+
}
521+
LOG(debug) << "metadata-collector: emitting " << keys.size() << " aggregated metadata entries";
522+
pc.outputs().snapshot(Output{"AMD", "AODMetadataKeys", 0}, keys);
523+
pc.outputs().snapshot(Output{"AMD", "AODMetadataVals", 0}, vals);
524+
};
525+
}};
526+
}
480527
} // namespace o2::framework::writers

Framework/AnalysisSupport/src/AODWriterHelpers.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ namespace o2::framework::writers
2121
struct AODWriterHelpers {
2222
static AlgorithmSpec getOutputObjHistWriter(ConfigContext const& context);
2323
static AlgorithmSpec getOutputTTreeWriter(ConfigContext const& context);
24+
static AlgorithmSpec getMetadataCollector(ConfigContext const& context);
2425
};
2526

2627
} // namespace o2::framework::writers

Framework/AnalysisSupport/src/Plugin.cxx

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ struct ROOTTTreeWriter : o2::framework::AlgorithmPlugin {
5454
}
5555
};
5656

57+
struct ROOTMetadataCollector : o2::framework::AlgorithmPlugin {
58+
o2::framework::AlgorithmSpec create(o2::framework::ConfigContext const& config) override
59+
{
60+
return o2::framework::writers::AODWriterHelpers::getMetadataCollector(config);
61+
}
62+
};
63+
5764
using namespace o2::framework;
5865
struct RunSummary : o2::framework::ServicePlugin {
5966
o2::framework::ServiceSpec* create() final
@@ -286,6 +293,7 @@ DEFINE_DPL_PLUGINS_BEGIN
286293
DEFINE_DPL_PLUGIN_INSTANCE(ROOTFileReader, CustomAlgorithm);
287294
DEFINE_DPL_PLUGIN_INSTANCE(ROOTObjWriter, CustomAlgorithm);
288295
DEFINE_DPL_PLUGIN_INSTANCE(ROOTTTreeWriter, CustomAlgorithm);
296+
DEFINE_DPL_PLUGIN_INSTANCE(ROOTMetadataCollector, CustomAlgorithm);
289297
DEFINE_DPL_PLUGIN_INSTANCE(RunSummary, CustomService);
290298
DEFINE_DPL_PLUGIN_INSTANCE(DiscoverMetadataInAOD, ConfigDiscovery);
291299
DEFINE_DPL_PLUGINS_END

Framework/Core/include/Framework/AnalysisSupportHelpers.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ struct AnalysisSupportHelpers {
4848
static DataProcessorSpec getOutputObjHistSink(ConfigContext const&);
4949
/// writes inputs of kind AOD to file
5050
static DataProcessorSpec getGlobalAODSink(ConfigContext const&);
51+
/// Match all inputs of kind META, merge them and republish as the AOD
52+
/// metadata keys/vals consumed by the AOD writer.
53+
static DataProcessorSpec getMetadataCollectorSink(ConfigContext const&);
5154
/// Get the data director
5255
static std::shared_ptr<DataOutputDirector> getDataOutputDirector(ConfigContext const& ctx);
5356
};

Framework/Core/src/AnalysisSupportHelpers.cxx

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,4 +220,23 @@ DataProcessorSpec
220220

221221
return spec;
222222
}
223+
224+
DataProcessorSpec
225+
AnalysisSupportHelpers::getMetadataCollectorSink(ConfigContext const& ctx)
226+
{
227+
// Lifetime is sporadic because META messages are not produced every
228+
// timeframe. The oldest-possible-timeframe completion policy (registered
229+
// in CompletionPolicy::createDefaultPolicies) decides when the collected
230+
// parts are merged and republished as the AOD metadata keys/vals that the
231+
// AOD writer turns into the AO2D metaData object.
232+
DataProcessorSpec spec{
233+
.name = "internal-dpl-metadata-collector",
234+
.inputs = {InputSpec("meta", DataSpecUtils::dataDescriptorMatcherFrom(header::DataOrigin{"META"}), Lifetime::Sporadic)},
235+
.outputs = {OutputSpec{OutputLabel{"keys"}, header::DataOrigin{"AMD"}, header::DataDescription{"AODMetadataKeys"}, 0, Lifetime::Sporadic},
236+
OutputSpec{OutputLabel{"vals"}, header::DataOrigin{"AMD"}, header::DataDescription{"AODMetadataVals"}, 0, Lifetime::Sporadic}},
237+
.algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTMetadataCollector", ctx),
238+
};
239+
240+
return spec;
241+
}
223242
} // namespace o2::framework

Framework/Core/src/CompletionPolicy.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ std::vector<CompletionPolicy>
2727
return {
2828
CompletionPolicyHelpers::consumeWhenAllOrdered("internal-dpl-aod-writer"),
2929
CompletionPolicyHelpers::consumeWhenAnyZeroCount("internal-dpl-injected-dummy-sink", [](DeviceSpec const& s) { return s.name.find("internal-dpl-injected-dummy-sink") != std::string::npos; }),
30+
CompletionPolicyHelpers::consumeWhenPastOldestPossibleTimeframe("internal-dpl-metadata-collector", [](DeviceSpec const& s) { return s.name == "internal-dpl-metadata-collector"; }),
3031
CompletionPolicyHelpers::consumeWhenAll()};
3132
}
3233

Framework/Core/src/WorkflowHelpers.cxx

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
236236

237237
std::vector<InputSpec> requestedCCDBs;
238238
std::vector<OutputSpec> providedCCDBs;
239+
bool hasMetaOutput = false;
239240

240241
for (size_t wi = 0; wi < workflow.size(); ++wi) {
241242
auto& processor = workflow[wi];
@@ -392,6 +393,8 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
392393
} else {
393394
it->bindings.push_back(output.binding.value);
394395
}
396+
} else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"META"})) {
397+
hasMetaOutput = true;
395398
}
396399

397400
if (output.lifetime == Lifetime::Condition) {
@@ -583,6 +586,12 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
583586
extraSpecs.push_back(rootSink);
584587
}
585588

589+
// Inject a collector which merges all META messages and republishes them as
590+
// the AOD metadata keys/vals the AOD writer writes into the AO2D file.
591+
if (hasMetaOutput) {
592+
extraSpecs.push_back(AnalysisSupportHelpers::getMetadataCollectorSink(ctx));
593+
}
594+
586595
workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
587596
extraSpecs.clear();
588597

Framework/TestWorkflows/src/o2TestHistograms.cxx

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,18 @@
1616
#include "Framework/runDataProcessing.h"
1717
#include "Framework/AnalysisTask.h"
1818
#include <TH2F.h>
19+
#include <TMap.h>
20+
#include <TObjString.h>
21+
#include <TString.h>
1922

2023
using namespace o2;
2124
using namespace o2::framework;
2225
using namespace o2::framework::expressions;
2326

27+
// pT cut applied when producing the skimmed derived dataset; the same value is
28+
// published as run metadata so it lands in the derived AO2D's metaData object.
29+
static constexpr float kSkimPtCut = 1.5f;
30+
2431
namespace o2::aod
2532
{
2633
O2ORIGIN("EMB");
@@ -38,8 +45,8 @@ DECLARE_SOA_TABLE(SkimmedExampleTrack, "AOD", "SKIMEXTRK", //!
3845
struct EtaAndClsHistogramsSimple {
3946
OutputObj<TH2F> etaClsH{TH2F("eta_vs_pt", "#eta vs pT", 102, -2.01, 2.01, 100, 0, 10)};
4047
Produces<o2::aod::SkimmedExampleTrack> skimEx;
41-
Configurable<std::string> trackFilterString{"track-filter", "o2::aod::track::pt < 10.f", "Track filter string"};
42-
Filter trackFilter = o2::aod::track::pt < 10.f;
48+
Configurable<std::string> trackFilterString{"track-filter", "", "Track filter string (overrides the pT cut when set)"};
49+
Filter trackFilter = o2::aod::track::pt < kSkimPtCut;
4350

4451
HistogramRegistry registry{
4552
"registry",
@@ -88,8 +95,8 @@ struct EtaAndClsHistogramsSimple {
8895
struct EtaAndClsHistogramsIUSimple {
8996
OutputObj<TH2F> etaClsH{TH2F("eta_vs_pt", "#eta vs pT", 102, -2.01, 2.01, 100, 0, 10)};
9097
Produces<o2::aod::SkimmedExampleTrack> skimEx;
91-
Configurable<std::string> trackFilterString{"track-filter", "o2::aod::track::pt < 10.f", "Track filter string"};
92-
Filter trackFilter = o2::aod::track::pt < 10.f;
98+
Configurable<std::string> trackFilterString{"track-filter", "", "Track filter string (overrides the pT cut when set)"};
99+
Filter trackFilter = o2::aod::track::pt < kSkimPtCut;
93100

94101
HistogramRegistry registry{
95102
"registry",
@@ -136,8 +143,8 @@ struct EtaAndClsHistogramsFull {
136143
} //
137144
};
138145

139-
Configurable<std::string> trackFilterString{"track-filter", "o2::aod::track::pt < 10.f", "Track filter string"};
140-
Filter trackFilter = o2::aod::track::pt < 10.f;
146+
Configurable<std::string> trackFilterString{"track-filter", "", "Track filter string (overrides the pT cut when set)"};
147+
Filter trackFilter = o2::aod::track::pt < kSkimPtCut;
141148

142149
void init(InitContext&)
143150
{
@@ -183,25 +190,37 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc)
183190
LOGP(info, "- {} present.", table);
184191
}
185192
// Notice it's important for the tasks to use the same name, otherwise topology generation will be confused.
193+
WorkflowSpec specs;
186194
if (runType == "2" || !hasTrackCov) {
187195
LOGP(info, "Using only tracks {}", runType);
188196
if (hasTrackIU) {
189-
return WorkflowSpec{
190-
adaptAnalysisTask<EtaAndClsHistogramsIUSimple>(cfgc, TaskName{"simple-histos"}),
191-
};
197+
specs = WorkflowSpec{adaptAnalysisTask<EtaAndClsHistogramsIUSimple>(cfgc, TaskName{"simple-histos"})};
198+
} else {
199+
specs = WorkflowSpec{adaptAnalysisTask<EtaAndClsHistogramsSimple>(cfgc, TaskName{"simple-histos"})};
192200
}
193-
return WorkflowSpec{
194-
adaptAnalysisTask<EtaAndClsHistogramsSimple>(cfgc, TaskName{"simple-histos"}),
195-
};
196201
} else {
197202
LOGP(info, "Using tracks extra {}", runType);
198203
if (hasTrackIU) {
199-
return WorkflowSpec{
200-
adaptAnalysisTask<EtaAndClsHistogramsIUSimple>(cfgc, TaskName{"simple-histos"}),
201-
};
204+
specs = WorkflowSpec{adaptAnalysisTask<EtaAndClsHistogramsIUSimple>(cfgc, TaskName{"simple-histos"})};
205+
} else {
206+
specs = WorkflowSpec{adaptAnalysisTask<EtaAndClsHistogramsFull>(cfgc, TaskName{"simple-histos"})};
202207
}
203-
return WorkflowSpec{
204-
adaptAnalysisTask<EtaAndClsHistogramsFull>(cfgc, TaskName{"simple-histos"}),
205-
};
206208
}
209+
210+
// Publish the skimming pT cut as run metadata, once per data frame so it is
211+
// aligned with the derived tables. The auto-injected metadata collector merges
212+
// all META messages (oldest-possible completion) and the AOD writer stores the
213+
// result as the metaData object of the derived AO2D file.
214+
specs.push_back(DataProcessorSpec{
215+
.name = "skim-metadata",
216+
.inputs = {InputSpec{"tfn", "TFN", "TFNumber"}},
217+
.outputs = {OutputSpec{{"meta"}, "META", "SKIMINFO", 0, Lifetime::Sporadic}},
218+
.algorithm = adaptStateless([](ProcessingContext& pc) {
219+
TMap m;
220+
m.SetOwnerKeyValue();
221+
m.Add(new TObjString("SkimTrackPtCut"), new TObjString(TString::Format("%g", kSkimPtCut)));
222+
pc.outputs().snapshot(Output{"META", "SKIMINFO", 0}, m);
223+
}),
224+
});
225+
return specs;
207226
}

0 commit comments

Comments
 (0)