Skip to content

Commit 4af0618

Browse files
committed
TPC: add test workflow to create dummy CMV data
1 parent d961ddf commit 4af0618

2 files changed

Lines changed: 380 additions & 0 deletions

File tree

Detectors/TPC/workflow/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,11 @@ o2_add_executable(idc-test-ft
198198
SOURCES test/test_ft_EPN_Aggregator.cxx
199199
PUBLIC_LINK_LIBRARIES O2::TPCWorkflow)
200200

201+
o2_add_executable(cmv-test-generator
202+
COMPONENT_NAME tpc
203+
SOURCES test/test_cmv_generator.cxx
204+
PUBLIC_LINK_LIBRARIES O2::TPCWorkflow)
205+
201206
o2_add_executable(miptrack-filter
202207
COMPONENT_NAME tpc
203208
SOURCES src/tpc-miptrack-filter.cxx
Lines changed: 375 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,375 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
/// \file test_cmv_generator.cxx
13+
/// \brief DPL source workflow that generates dummy CMV data for testing the CMV FLP pipeline.
14+
///
15+
/// Replaces o2-tpc-cmv-to-vector in tests; directly emits CMVVECTOR and CMVORBITS
16+
/// messages per CRU per TF so the workflow can be piped straight into o2-tpc-cmv-flp:
17+
///
18+
/// o2-tpc-cmv-test-generator --crus 0-359 --timeframes 100 \
19+
/// | o2-tpc-cmv-flp --crus 0-359 --n-TFs-buffer 10 \
20+
/// | o2-dpl-output-proxy --dataspec "downstream:TPC/CMVGROUP;downstream:TPC/CMVORBITINFO" ...
21+
///
22+
/// \author Ernst Hellbar <ernst.hellbar@cern.ch>
23+
24+
#include "Framework/DataProcessorSpec.h"
25+
#include "Framework/Task.h"
26+
#include "Framework/ControlService.h"
27+
#include "Framework/ConfigParamRegistry.h"
28+
#include "Framework/ConfigParamSpec.h"
29+
#include "Framework/Logger.h"
30+
#include "Headers/DataHeader.h"
31+
#include "Algorithm/RangeTokenizer.h"
32+
#include "TPCBase/CRU.h"
33+
#include "DataFormatsTPC/CMV.h"
34+
#include "TPCCalibration/CMVHelper.h"
35+
#include "TPCCalibration/CMVContainer.h"
36+
#include "TPCWorkflow/ProcessingHelpers.h"
37+
#include "CommonUtils/TreeStreamRedirector.h"
38+
#include "CommonUtils/ConfigurableParam.h"
39+
#include "DetectorsRaw/HBFUtilsInitializer.h"
40+
#include "DetectorsRaw/HBFUtils.h"
41+
#include <fmt/format.h>
42+
#include <fmt/ranges.h>
43+
44+
#include <vector>
45+
#include <chrono>
46+
#include <thread>
47+
#include <cmath>
48+
#include <memory>
49+
#include <limits>
50+
#include <random>
51+
#include <stdexcept>
52+
#include <string>
53+
#include <unordered_set>
54+
55+
using namespace o2::framework;
56+
using o2::header::gDataOriginTPC;
57+
58+
// ─────────────────────────────────────────────────────────────────────────────
59+
// workflow options
60+
// ─────────────────────────────────────────────────────────────────────────────
61+
void customize(std::vector<ConfigParamSpec>& workflowOptions)
62+
{
63+
const std::string cruDefault = "0-" + std::to_string(o2::tpc::CRU::MaxCRU - 1);
64+
std::vector<ConfigParamSpec> options{
65+
{"crus", VariantType::String, cruDefault.c_str(), {"List of CRUs, comma-separated ranges, e.g. 0-3,7,9-15"}},
66+
{"timeframes", VariantType::Int, 100, {"Number of TFs to generate; use -1 to run indefinitely"}},
67+
{"delay", VariantType::Bool, false, {"Add delay after sending all CRUs"}},
68+
{"delayTime", VariantType::Int, 1, {"Duration of the global per-TF delay in ms (requires --delay true)"}},
69+
{"delayCRUs", VariantType::String, "", {"CRUs for which to add an extra per-CRU delay before sending, comma-separated ranges"}},
70+
{"delayTimeCRUs", VariantType::Int, 1, {"Duration of the per-CRU delay in ms (requires --delayCRUs)"}},
71+
{"dropTFsRandom", VariantType::Int, 0, {"Drop a whole TF randomly: on average one every N TFs (0 = disabled)"}},
72+
{"dropTFsRange", VariantType::String, "", {"Drop all TFs in this range, e.g. 10-12"}},
73+
{"seed", VariantType::Int, 42, {"RNG seed for CMV value generation"}},
74+
{"amplitude", VariantType::Float, 5.0f, {"Amplitude of the sinusoidal CMV signal (ADC units); ignored when --input-file is set"}},
75+
{"noise", VariantType::Float, 1.0f, {"Gaussian noise std-dev added per time bin (ADC units); used as the smearing width in --input-file mode"}},
76+
{"input-file", VariantType::String, "", {"ROOT file with a CMV 'ccdb_object' tree; the template TF (see --input-entry) is decoded once and re-emitted, smeared per generated TF. Empty = synthetic sinusoidal signal"}},
77+
{"input-entry", VariantType::Int, 0, {"Tree entry (TF index) used as the template when --input-file is set"}},
78+
{"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings"}}};
79+
o2::raw::HBFUtilsInitializer::addConfigOption(options, "hbfutils");
80+
std::swap(workflowOptions, options);
81+
}
82+
83+
#include "Framework/runDataProcessing.h"
84+
85+
// ─────────────────────────────────────────────────────────────────────────────
86+
// generator device
87+
// ─────────────────────────────────────────────────────────────────────────────
88+
class CMVGeneratorDevice : public o2::framework::Task
89+
{
90+
public:
91+
static constexpr uint32_t sOrbitsPerPacket = 8; ///< each CMV packet covers 8 heartbeat orbits
92+
93+
CMVGeneratorDevice(const std::vector<uint32_t>& crus,
94+
const std::unordered_set<uint32_t>& delayCRUs,
95+
unsigned int maxTFs,
96+
bool delay,
97+
int delayTime,
98+
int delayTimeCRUs,
99+
int dropTFsRandom,
100+
const std::vector<int>& rangeTFsDrop,
101+
float amplitude,
102+
float noise,
103+
int seed,
104+
const std::string& inputFile,
105+
long long inputEntry)
106+
: mCRUs(crus), mDelayCRUs(delayCRUs), mMaxTFs(maxTFs), mDelay(delay), mDelayTime(delayTime), mDelayTimeCRUs(delayTimeCRUs), mDropTFsRandom(dropTFsRandom), mRangeTFsDrop(rangeTFsDrop), mAmplitude(amplitude), mNoise(noise), mRng(static_cast<std::mt19937::result_type>(seed)), mInputFileName(inputFile), mInputEntry(inputEntry) {}
107+
108+
void init(o2::framework::InitContext& ic) final
109+
{
110+
mTimer100TFs = std::chrono::high_resolution_clock::now();
111+
112+
if (!mCRUs.empty()) {
113+
LOGP(info, "crus: {}", fmt::join(mCRUs, ", "));
114+
}
115+
if (!mDelayCRUs.empty()) {
116+
const std::vector<uint32_t> delayCRUsSorted(mDelayCRUs.begin(), mDelayCRUs.end());
117+
LOGP(info, "delayCRUs: {}", fmt::join(delayCRUsSorted, ", "));
118+
}
119+
120+
mWriteDebug = ic.options().get<bool>("write-debug");
121+
if (mWriteDebug) {
122+
mDebugStreamFileName = ic.options().get<std::string>("debug-file-name");
123+
LOGP(info, "Creating debug stream {}", mDebugStreamFileName);
124+
mDebugStream = std::make_unique<o2::utils::TreeStreamRedirector>(mDebugStreamFileName.data(), "recreate");
125+
}
126+
127+
if (!mInputFileName.empty()) {
128+
o2::tpc::CMVFileHandle handle;
129+
if (!handle.open(mInputFileName)) {
130+
throw std::runtime_error("CMV generator: failed to open input file " + mInputFileName);
131+
}
132+
const auto nEntries = handle.tree->GetEntries();
133+
if (mInputEntry < 0 || mInputEntry >= nEntries) {
134+
const auto msg = fmt::format("CMV generator: --input-entry {} out of range [0, {}) in {}", mInputEntry, nEntries, mInputFileName);
135+
handle.close();
136+
throw std::runtime_error(msg);
137+
}
138+
const o2::tpc::CMVPerTF* tmpl = handle.getEntry(mInputEntry);
139+
if (!tmpl) {
140+
handle.close();
141+
throw std::runtime_error("CMV generator: failed to read/decode entry from " + mInputFileName);
142+
}
143+
mBaseCMVFloat.resize(mCRUs.size());
144+
for (size_t iCRU = 0; iCRU < mCRUs.size(); ++iCRU) {
145+
const auto cru = mCRUs[iCRU];
146+
auto& base = mBaseCMVFloat[iCRU];
147+
base.resize(o2::tpc::cmv::NTimeBinsPerTF);
148+
for (uint32_t tb = 0; tb < o2::tpc::cmv::NTimeBinsPerTF; ++tb) {
149+
base[tb] = tmpl->getCMVFloat(static_cast<int>(cru), static_cast<int>(tb));
150+
}
151+
}
152+
handle.close();
153+
mUseInputFile = true;
154+
LOGP(info, "Loaded CMV template from {} (entry {}): {} CRUs x {} bins, smearing with Gaussian sigma {} ADC per generated TF",
155+
mInputFileName, mInputEntry, mCRUs.size(), o2::tpc::cmv::NTimeBinsPerTF, mNoise);
156+
}
157+
}
158+
159+
void run(o2::framework::ProcessingContext& ctx) final
160+
{
161+
using timer = std::chrono::high_resolution_clock;
162+
const auto tf = o2::tpc::processing_helpers::getCurrentTF(ctx);
163+
164+
// ── TF dropping ──────────────────────────────────────────────────────────
165+
// Note: RangeTokenizer guarantees sorted output, so front()/back() are min/max.
166+
if (!mRangeTFsDrop.empty() && tf >= static_cast<uint32_t>(mRangeTFsDrop.front()) && tf <= static_cast<uint32_t>(mRangeTFsDrop.back())) {
167+
LOGP(info, "Dropping TF {} (range drop)", tf);
168+
return;
169+
}
170+
if (mDropTFsRandom > 0 && std::uniform_int_distribution<int>{0, mDropTFsRandom - 1}(mRng) == 0) {
171+
LOGP(info, "Dropping TF {} (random drop)", tf);
172+
return;
173+
}
174+
175+
auto start = timer::now();
176+
177+
// ── CMV values ───────────────────────────────────────────────────────────
178+
// NTimeBinsPerTF = NPacketsPerTFPerCRU (4) * NTimeBinsPerPacket (3564) = 14256
179+
// - synthetic mode: shared cmvVec = sinusoidal signal + noise (same for all CRUs)
180+
// - input-file mode: per-CRU template + the shared noise vector
181+
const bool addNoise = (mNoise > 0.f); // skip all RNG when --noise 0
182+
std::normal_distribution<float> noiseDist{0.f, mNoise};
183+
std::vector<uint16_t> cmvVec(o2::tpc::cmv::NTimeBinsPerTF);
184+
std::vector<float> noiseVec; // only populated in input-file mode when noise is enabled
185+
if (mUseInputFile) {
186+
if (addNoise) {
187+
noiseVec.resize(o2::tpc::cmv::NTimeBinsPerTF);
188+
for (auto& n : noiseVec) {
189+
n = noiseDist(mRng);
190+
}
191+
}
192+
} else {
193+
const float signal = -std::abs(mAmplitude * std::sin(tf * 0.05f));
194+
for (auto& v : cmvVec) {
195+
o2::tpc::cmv::Data d;
196+
d.setCMVFloat(addNoise ? (signal + noiseDist(mRng)) : signal);
197+
v = d.getCMV();
198+
}
199+
}
200+
201+
// ── Orbit / BC info (same for all CRUs) ──────────────────────────────────
202+
// One packed (orbit<<32|bc) entry per CMV packet (4 per TF).
203+
// Each packet covers 8 heartbeat orbits (NTimeBinsPerPacket = 3564 = 8 LHC orbits),
204+
// so the orbit advances by 8 per packet and by NPacketsPerTFPerCRU*8 = 32 per TF.
205+
std::vector<uint64_t> orbitBCVec(o2::tpc::cmv::NPacketsPerTFPerCRU);
206+
for (uint32_t pkt = 0; pkt < o2::tpc::cmv::NPacketsPerTFPerCRU; ++pkt) {
207+
const uint32_t orbit = static_cast<uint32_t>(tf * o2::tpc::cmv::NPacketsPerTFPerCRU * sOrbitsPerPacket + pkt * sOrbitsPerPacket);
208+
orbitBCVec[pkt] = uint64_t(orbit) << 32; // bc = 0
209+
}
210+
211+
for (size_t iCRU = 0; iCRU < mCRUs.size(); ++iCRU) {
212+
const auto cru = mCRUs[iCRU];
213+
const o2::header::DataHeader::SubSpecificationType subSpec{cru << 7};
214+
215+
// ── per-CRU delay ────────────────────────────────────────────────────
216+
if (mDelayCRUs.count(cru)) {
217+
std::this_thread::sleep_for(std::chrono::milliseconds(mDelayTimeCRUs));
218+
}
219+
220+
// ── input-file mode: smear this CRU's real template with the shared noise ─
221+
if (mUseInputFile) {
222+
const auto& base = mBaseCMVFloat[iCRU];
223+
for (uint32_t tb = 0; tb < o2::tpc::cmv::NTimeBinsPerTF; ++tb) {
224+
o2::tpc::cmv::Data d;
225+
d.setCMVFloat(addNoise ? (base[tb] + noiseVec[tb]) : base[tb]);
226+
cmvVec[tb] = d.getCMV();
227+
}
228+
}
229+
230+
ctx.outputs().snapshot(Output{gDataOriginTPC, "CMVVECTOR", subSpec}, cmvVec);
231+
ctx.outputs().snapshot(Output{gDataOriginTPC, "CMVORBITS", subSpec}, orbitBCVec);
232+
233+
if (mWriteDebug) {
234+
auto& stream = (*mDebugStream) << "cmvs";
235+
stream << "cru=" << cru
236+
<< "tfCounter=" << tf
237+
<< "nCMVs=" << cmvVec.size()
238+
<< "cmvs=" << cmvVec
239+
<< "\n";
240+
}
241+
}
242+
243+
if (!(tf % 100)) {
244+
const auto elapsed100 = std::chrono::duration_cast<std::chrono::milliseconds>(timer::now() - mTimer100TFs).count();
245+
LOGP(info, "Generated CMV data for TF {} ({} ms for last 100 TFs)", tf, elapsed100);
246+
mTimer100TFs = timer::now();
247+
}
248+
249+
// ── global delay ─────────────────────────────────────────────────────────
250+
if (mDelay) {
251+
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(timer::now() - start).count();
252+
if (elapsed < mDelayTime) {
253+
std::this_thread::sleep_for(std::chrono::milliseconds(mDelayTime - elapsed));
254+
}
255+
}
256+
257+
// endOfStream() propagates the EoS signal to downstream devices (required for source devices).
258+
if (mMaxTFs != std::numeric_limits<unsigned int>::max() && tf >= mMaxTFs - 1) {
259+
ctx.services().get<ControlService>().endOfStream();
260+
ctx.services().get<ControlService>().readyToQuit(QuitRequest::Me);
261+
}
262+
}
263+
264+
void endOfStream(o2::framework::EndOfStreamContext&) final { closeFiles(); }
265+
void stop() final { closeFiles(); }
266+
267+
private:
268+
void closeFiles()
269+
{
270+
if (mDebugStream) {
271+
auto& stream = (*mDebugStream) << "cmvs";
272+
auto& tree = stream.getTree();
273+
tree.SetAlias("sector", "int(cru/10)");
274+
mDebugStream->Close();
275+
mDebugStream.reset(nullptr);
276+
}
277+
}
278+
279+
const std::vector<uint32_t> mCRUs{};
280+
const std::unordered_set<uint32_t> mDelayCRUs{};
281+
const unsigned int mMaxTFs{};
282+
const bool mDelay{false};
283+
const int mDelayTime{1};
284+
const int mDelayTimeCRUs{1};
285+
const int mDropTFsRandom{0};
286+
const std::vector<int> mRangeTFsDrop{};
287+
const float mAmplitude{5.f};
288+
const float mNoise{1.f};
289+
std::mt19937 mRng{};
290+
const std::string mInputFileName{}; ///< CMV ROOT file to use as template ("" = synthetic mode)
291+
const long long mInputEntry{0}; ///< tree entry (TF) used as template
292+
bool mUseInputFile{false}; ///< true once a template has been loaded
293+
std::vector<std::vector<float>> mBaseCMVFloat; ///< decoded template CMV values [iCRU][timeBin], aligned to mCRUs
294+
std::chrono::high_resolution_clock::time_point mTimer100TFs{};
295+
bool mWriteDebug{false};
296+
std::string mDebugStreamFileName{};
297+
std::unique_ptr<o2::utils::TreeStreamRedirector> mDebugStream{};
298+
};
299+
300+
// ─────────────────────────────────────────────────────────────────────────────
301+
DataProcessorSpec generateCMVsCRU(const std::vector<uint32_t>& crus,
302+
const std::unordered_set<uint32_t>& delayCRUs,
303+
unsigned int maxTFs,
304+
bool delay,
305+
int delayTime,
306+
int delayTimeCRUs,
307+
int dropTFsRandom,
308+
const std::vector<int>& rangeTFsDrop,
309+
float amplitude,
310+
float noise,
311+
int seed,
312+
const std::string& inputFile,
313+
long long inputEntry)
314+
{
315+
std::vector<OutputSpec> outputSpecs;
316+
outputSpecs.reserve(crus.size() * 2);
317+
for (const auto cru : crus) {
318+
const o2::header::DataHeader::SubSpecificationType subSpec{cru << 7};
319+
outputSpecs.emplace_back(gDataOriginTPC, "CMVVECTOR", subSpec, Lifetime::Timeframe);
320+
outputSpecs.emplace_back(gDataOriginTPC, "CMVORBITS", subSpec, Lifetime::Timeframe);
321+
}
322+
323+
return DataProcessorSpec{
324+
"tpc-cmv-generator",
325+
Inputs{},
326+
outputSpecs,
327+
AlgorithmSpec{adaptFromTask<CMVGeneratorDevice>(crus, delayCRUs, maxTFs, delay, delayTime, delayTimeCRUs, dropTFsRandom, rangeTFsDrop, amplitude, noise, seed, inputFile, inputEntry)},
328+
Options{
329+
{"write-debug", VariantType::Bool, false, {"Write a debug output tree"}},
330+
{"debug-file-name", VariantType::String, "./cmv_generator_debug.root", {"Name of the debug output file"}},
331+
}};
332+
}
333+
334+
// ─────────────────────────────────────────────────────────────────────────────
335+
WorkflowSpec defineDataProcessing(ConfigContext const& config)
336+
{
337+
const auto tpcCRUs = o2::RangeTokenizer::tokenize<int>(config.options().get<std::string>("crus"));
338+
const std::vector<uint32_t> crus(tpcCRUs.begin(), tpcCRUs.end());
339+
340+
const auto delayCRUsStr = config.options().get<std::string>("delayCRUs");
341+
std::unordered_set<uint32_t> delayCRUs;
342+
if (!delayCRUsStr.empty()) {
343+
for (const auto cru : o2::RangeTokenizer::tokenize<int>(delayCRUsStr)) {
344+
delayCRUs.insert(static_cast<uint32_t>(cru));
345+
}
346+
}
347+
348+
const auto dropTFsRangeStr = config.options().get<std::string>("dropTFsRange");
349+
const auto rangeTFsDrop = dropTFsRangeStr.empty() ? std::vector<int>{} : o2::RangeTokenizer::tokenize<int>(dropTFsRangeStr);
350+
const int timeframesInt = config.options().get<int>("timeframes");
351+
// -1 means run indefinitely; map to UINT_MAX so the termination check never fires.
352+
const auto timeframes = (timeframesInt < 0) ? std::numeric_limits<unsigned int>::max() : static_cast<unsigned int>(timeframesInt);
353+
const auto delay = config.options().get<bool>("delay");
354+
const auto delayTime = config.options().get<int>("delayTime");
355+
const auto delayTimeCRUs = config.options().get<int>("delayTimeCRUs");
356+
const auto dropTFsRandom = config.options().get<int>("dropTFsRandom");
357+
const auto seed = config.options().get<int>("seed");
358+
const auto amplitude = config.options().get<float>("amplitude");
359+
const auto noise = config.options().get<float>("noise");
360+
const auto inputFile = config.options().get<std::string>("input-file");
361+
const auto inputEntry = static_cast<long long>(config.options().get<int>("input-entry"));
362+
363+
o2::conf::ConfigurableParam::updateFromString(config.options().get<std::string>("configKeyValues"));
364+
365+
WorkflowSpec workflow;
366+
workflow.emplace_back(generateCMVsCRU(crus, delayCRUs, timeframes, delay, delayTime, delayTimeCRUs, dropTFsRandom, rangeTFsDrop, amplitude, noise, seed, inputFile, inputEntry));
367+
368+
auto& hbfu = o2::raw::HBFUtils::Instance();
369+
long startTime = hbfu.startTime > 0 ? hbfu.startTime : std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
370+
o2::conf::ConfigurableParam::updateFromString(fmt::format("HBFUtils.startTime={}", startTime).data());
371+
o2::conf::ConfigurableParam::updateFromString(fmt::format("HBFUtils.nHBFPerTF={}", hbfu.nHBFPerTF).data());
372+
o2::raw::HBFUtilsInitializer hbfIni(config, workflow);
373+
374+
return workflow;
375+
}

0 commit comments

Comments
 (0)