Skip to content

Commit 8eef8f7

Browse files
committed
TPC: add test workflow to create dummy CMV data
1 parent d961ddf commit 8eef8f7

2 files changed

Lines changed: 293 additions & 0 deletions

File tree

Detectors/TPC/workflow/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,11 @@ o2_add_executable(idc-test-ft
198198
SOURCES test/test_ft_EPN_Aggregator.cxx
199199
PUBLIC_LINK_LIBRARIES O2::TPCWorkflow)
200200

201+
o2_add_executable(cmv-test-generator
202+
COMPONENT_NAME tpc
203+
SOURCES test/test_cmv_generator.cxx
204+
PUBLIC_LINK_LIBRARIES O2::TPCWorkflow)
205+
201206
o2_add_executable(miptrack-filter
202207
COMPONENT_NAME tpc
203208
SOURCES src/tpc-miptrack-filter.cxx
Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
/// \file test_cmv_generator.cxx
13+
/// \brief DPL source workflow that generates dummy CMV data for testing the CMV FLP pipeline.
14+
///
15+
/// Replaces o2-tpc-cmv-to-vector in tests; directly emits CMVVECTOR and CMVORBITS
16+
/// messages per CRU per TF so the workflow can be piped straight into o2-tpc-cmv-flp:
17+
///
18+
/// o2-tpc-cmv-test-generator --crus 0-359 --timeframes 100 \
19+
/// | o2-tpc-cmv-flp --crus 0-359 --n-TFs-buffer 10 \
20+
/// | o2-dpl-output-proxy --dataspec "downstream:TPC/CMVGROUP;downstream:TPC/CMVORBITINFO" ...
21+
///
22+
/// \author Ernst Hellbar <ernst.hellbar@cern.ch>
23+
24+
#include "Framework/DataProcessorSpec.h"
25+
#include "Framework/Task.h"
26+
#include "Framework/ControlService.h"
27+
#include "Framework/ConfigParamRegistry.h"
28+
#include "Framework/ConfigParamSpec.h"
29+
#include "Framework/Logger.h"
30+
#include "Headers/DataHeader.h"
31+
#include "Algorithm/RangeTokenizer.h"
32+
#include "TPCBase/CRU.h"
33+
#include "DataFormatsTPC/CMV.h"
34+
#include "TPCWorkflow/ProcessingHelpers.h"
35+
#include "CommonUtils/TreeStreamRedirector.h"
36+
#include "DetectorsRaw/HBFUtilsInitializer.h"
37+
#include "DetectorsRaw/HBFUtils.h"
38+
#include <fmt/format.h>
39+
#include <fmt/ranges.h>
40+
41+
#include <vector>
42+
#include <chrono>
43+
#include <thread>
44+
#include <cmath>
45+
#include <memory>
46+
#include <random>
47+
#include <unordered_set>
48+
49+
using namespace o2::framework;
50+
using o2::header::gDataOriginTPC;
51+
52+
// ─────────────────────────────────────────────────────────────────────────────
53+
// workflow options
54+
// ─────────────────────────────────────────────────────────────────────────────
55+
void customize(std::vector<ConfigParamSpec>& workflowOptions)
56+
{
57+
const std::string cruDefault = "0-" + std::to_string(o2::tpc::CRU::MaxCRU - 1);
58+
std::vector<ConfigParamSpec> options{
59+
{"crus", VariantType::String, cruDefault.c_str(), {"List of CRUs, comma-separated ranges, e.g. 0-3,7,9-15"}},
60+
{"timeframes", VariantType::Int, 100, {"Number of TFs to generate; use -1 to run indefinitely"}},
61+
{"delay", VariantType::Bool, false, {"Add delay after sending all CRUs"}},
62+
{"delayTime", VariantType::Int, 1, {"Duration of the global per-TF delay in ms (requires --delay true)"}},
63+
{"delayCRUs", VariantType::String, "", {"CRUs for which to add an extra per-CRU delay before sending, comma-separated ranges"}},
64+
{"delayTimeCRUs", VariantType::Int, 1, {"Duration of the per-CRU delay in ms (requires --delayCRUs)"}},
65+
{"dropTFsRandom", VariantType::Int, 0, {"Drop a whole TF randomly: on average one every N TFs (0 = disabled)"}},
66+
{"dropTFsRange", VariantType::String, "", {"Drop all TFs in this range, e.g. 10-12"}},
67+
{"seed", VariantType::Int, 42, {"RNG seed for CMV value generation"}},
68+
{"amplitude", VariantType::Float, 5.0f, {"Amplitude of the sinusoidal CMV signal (ADC units)"}},
69+
{"noise", VariantType::Float, 1.0f, {"Gaussian noise std-dev added per time bin (ADC units)"}},
70+
{"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings"}}};
71+
o2::raw::HBFUtilsInitializer::addConfigOption(options);
72+
std::swap(workflowOptions, options);
73+
}
74+
75+
#include "Framework/runDataProcessing.h"
76+
77+
// ─────────────────────────────────────────────────────────────────────────────
78+
// generator device
79+
// ─────────────────────────────────────────────────────────────────────────────
80+
class CMVGeneratorDevice : public o2::framework::Task
81+
{
82+
public:
83+
static constexpr uint32_t sOrbitsPerPacket = 8; ///< each CMV packet covers 8 heartbeat orbits
84+
85+
CMVGeneratorDevice(const std::vector<uint32_t>& crus,
86+
const std::unordered_set<uint32_t>& delayCRUs,
87+
unsigned int maxTFs,
88+
bool delay,
89+
int delayTime,
90+
int delayTimeCRUs,
91+
int dropTFsRandom,
92+
const std::vector<int>& rangeTFsDrop,
93+
float amplitude,
94+
float noise,
95+
int seed)
96+
: mCRUs(crus), mDelayCRUs(delayCRUs), mMaxTFs(maxTFs), mDelay(delay), mDelayTime(delayTime), mDelayTimeCRUs(delayTimeCRUs), mDropTFsRandom(dropTFsRandom), mRangeTFsDrop(rangeTFsDrop), mAmplitude(amplitude), mNoise(noise), mRng(static_cast<std::mt19937::result_type>(seed)) {}
97+
98+
void init(o2::framework::InitContext& ic) final
99+
{
100+
if (!mCRUs.empty()) {
101+
LOGP(info, "crus: {}", fmt::join(mCRUs, ", "));
102+
}
103+
if (!mDelayCRUs.empty()) {
104+
const std::vector<uint32_t> delayCRUsSorted(mDelayCRUs.begin(), mDelayCRUs.end());
105+
LOGP(info, "delayCRUs: {}", fmt::join(delayCRUsSorted, ", "));
106+
}
107+
108+
mWriteDebug = ic.options().get<bool>("write-debug");
109+
if (mWriteDebug) {
110+
mDebugStreamFileName = ic.options().get<std::string>("debug-file-name");
111+
LOGP(info, "Creating debug stream {}", mDebugStreamFileName);
112+
mDebugStream = std::make_unique<o2::utils::TreeStreamRedirector>(mDebugStreamFileName.data(), "recreate");
113+
}
114+
}
115+
116+
void run(o2::framework::ProcessingContext& ctx) final
117+
{
118+
using timer = std::chrono::high_resolution_clock;
119+
const auto tf = o2::tpc::processing_helpers::getCurrentTF(ctx);
120+
121+
// ── TF dropping ──────────────────────────────────────────────────────────
122+
if (!mRangeTFsDrop.empty() && tf >= (uint32_t)mRangeTFsDrop.front() && tf <= (uint32_t)mRangeTFsDrop.back()) {
123+
LOGP(info, "Dropping TF {} (range drop)", tf);
124+
return;
125+
}
126+
if (mDropTFsRandom > 0 && std::uniform_int_distribution<int>{0, mDropTFsRandom - 1}(mRng) == 0) {
127+
LOGP(info, "Dropping TF {} (random drop)", tf);
128+
return;
129+
}
130+
131+
auto start = timer::now();
132+
static auto sTimer100TFs = start;
133+
134+
// ── CMV values (generated once per TF, reused for all CRUs) ─────────────
135+
// NTimeBinsPerTF = NPacketsPerTFPerCRU (4) * NTimeBinsPerPacket (3564) = 14256
136+
// Using std::mt19937 + std::normal_distribution for fast noise generation.
137+
// All CRUs share the same noise realization; the sinusoidal signal is common too.
138+
const float signal = mAmplitude * std::sin(tf * 0.05f);
139+
std::normal_distribution<float> noiseDist{0.f, mNoise};
140+
std::vector<uint16_t> cmvVec(o2::tpc::cmv::NTimeBinsPerTF);
141+
for (auto& v : cmvVec) {
142+
o2::tpc::cmv::Data d;
143+
d.setCMVFloat(signal + noiseDist(mRng));
144+
v = d.getCMV();
145+
}
146+
147+
// ── Orbit / BC info (same for all CRUs) ──────────────────────────────────
148+
// One packed (orbit<<32|bc) entry per CMV packet (4 per TF).
149+
// Each packet covers 8 heartbeat orbits (NTimeBinsPerPacket = 3564 = 8 LHC orbits),
150+
// so the orbit advances by 8 per packet and by NPacketsPerTFPerCRU*8 = 32 per TF.
151+
std::vector<uint64_t> orbitBCVec(o2::tpc::cmv::NPacketsPerTFPerCRU);
152+
for (uint32_t pkt = 0; pkt < o2::tpc::cmv::NPacketsPerTFPerCRU; ++pkt) {
153+
const uint32_t orbit = static_cast<uint32_t>(tf * o2::tpc::cmv::NPacketsPerTFPerCRU * sOrbitsPerPacket + pkt * sOrbitsPerPacket);
154+
orbitBCVec[pkt] = uint64_t(orbit) << 32; // bc = 0
155+
}
156+
157+
for (const auto cru : mCRUs) {
158+
const o2::header::DataHeader::SubSpecificationType subSpec{cru << 7};
159+
160+
// ── per-CRU delay ────────────────────────────────────────────────────
161+
if (mDelayCRUs.count(cru)) {
162+
std::this_thread::sleep_for(std::chrono::milliseconds(mDelayTimeCRUs));
163+
}
164+
165+
ctx.outputs().snapshot(Output{gDataOriginTPC, "CMVVECTOR", subSpec}, cmvVec);
166+
ctx.outputs().snapshot(Output{gDataOriginTPC, "CMVORBITS", subSpec}, orbitBCVec);
167+
168+
if (mWriteDebug) {
169+
auto& stream = (*mDebugStream) << "cmvs";
170+
stream << "cru=" << cru
171+
<< "tfCounter=" << tf
172+
<< "nCMVs=" << cmvVec.size()
173+
<< "cmvs=" << cmvVec
174+
<< "\n";
175+
}
176+
}
177+
178+
if (!(tf % 100)) {
179+
const auto elapsed100 = std::chrono::duration_cast<std::chrono::milliseconds>(timer::now() - sTimer100TFs).count();
180+
LOGP(info, "Generated CMV data for TF {} ({} ms for last 100 TFs)", tf, elapsed100);
181+
sTimer100TFs = timer::now();
182+
}
183+
184+
// ── global delay ─────────────────────────────────────────────────────────
185+
if (mDelay) {
186+
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(timer::now() - start).count();
187+
if (elapsed < mDelayTime) {
188+
std::this_thread::sleep_for(std::chrono::milliseconds(mDelayTime - elapsed));
189+
}
190+
}
191+
192+
if (tf >= mMaxTFs - 1) {
193+
ctx.services().get<ControlService>().endOfStream();
194+
ctx.services().get<ControlService>().readyToQuit(QuitRequest::Me);
195+
}
196+
}
197+
198+
void endOfStream(o2::framework::EndOfStreamContext&) final { closeFiles(); }
199+
void stop() final { closeFiles(); }
200+
201+
private:
202+
void closeFiles()
203+
{
204+
if (mDebugStream) {
205+
auto& stream = (*mDebugStream) << "cmvs";
206+
auto& tree = stream.getTree();
207+
tree.SetAlias("sector", "int(cru/10)");
208+
mDebugStream->Close();
209+
mDebugStream.reset(nullptr);
210+
}
211+
}
212+
213+
const std::vector<uint32_t> mCRUs{};
214+
const std::unordered_set<uint32_t> mDelayCRUs{};
215+
const unsigned int mMaxTFs{};
216+
const bool mDelay{false};
217+
const int mDelayTime{1};
218+
const int mDelayTimeCRUs{1};
219+
const int mDropTFsRandom{0};
220+
const std::vector<int> mRangeTFsDrop{};
221+
const float mAmplitude{5.f};
222+
const float mNoise{1.f};
223+
std::mt19937 mRng{};
224+
bool mWriteDebug{false};
225+
std::string mDebugStreamFileName{};
226+
std::unique_ptr<o2::utils::TreeStreamRedirector> mDebugStream{};
227+
};
228+
229+
// ─────────────────────────────────────────────────────────────────────────────
230+
DataProcessorSpec generateCMVsCRU(const std::vector<uint32_t>& crus,
231+
const std::unordered_set<uint32_t>& delayCRUs,
232+
unsigned int maxTFs,
233+
bool delay,
234+
int delayTime,
235+
int delayTimeCRUs,
236+
int dropTFsRandom,
237+
const std::vector<int>& rangeTFsDrop,
238+
float amplitude,
239+
float noise,
240+
int seed)
241+
{
242+
std::vector<OutputSpec> outputSpecs;
243+
outputSpecs.reserve(crus.size() * 2);
244+
for (const auto cru : crus) {
245+
const o2::header::DataHeader::SubSpecificationType subSpec{cru << 7};
246+
outputSpecs.emplace_back(gDataOriginTPC, "CMVVECTOR", subSpec, Lifetime::Timeframe);
247+
outputSpecs.emplace_back(gDataOriginTPC, "CMVORBITS", subSpec, Lifetime::Timeframe);
248+
}
249+
250+
return DataProcessorSpec{
251+
"tpc-cmv-generator",
252+
Inputs{},
253+
outputSpecs,
254+
AlgorithmSpec{adaptFromTask<CMVGeneratorDevice>(crus, delayCRUs, maxTFs, delay, delayTime, delayTimeCRUs, dropTFsRandom, rangeTFsDrop, amplitude, noise, seed)},
255+
Options{
256+
{"write-debug", VariantType::Bool, false, {"Write a debug output tree"}},
257+
{"debug-file-name", VariantType::String, "./cmv_generator_debug.root", {"Name of the debug output file"}},
258+
}};
259+
}
260+
261+
// ─────────────────────────────────────────────────────────────────────────────
262+
WorkflowSpec defineDataProcessing(ConfigContext const& config)
263+
{
264+
const auto tpcCRUs = o2::RangeTokenizer::tokenize<int>(config.options().get<std::string>("crus"));
265+
const std::vector<uint32_t> crus(tpcCRUs.begin(), tpcCRUs.end());
266+
267+
const auto delayCRUsStr = config.options().get<std::string>("delayCRUs");
268+
std::unordered_set<uint32_t> delayCRUs;
269+
if (!delayCRUsStr.empty()) {
270+
for (const auto cru : o2::RangeTokenizer::tokenize<int>(delayCRUsStr)) {
271+
delayCRUs.insert(static_cast<uint32_t>(cru));
272+
}
273+
}
274+
275+
const auto rangeTFsDrop = o2::RangeTokenizer::tokenize<int>(config.options().get<std::string>("dropTFsRange"));
276+
const auto timeframes = static_cast<unsigned int>(config.options().get<int>("timeframes"));
277+
const auto delay = config.options().get<bool>("delay");
278+
const auto delayTime = config.options().get<int>("delayTime");
279+
const auto delayTimeCRUs = config.options().get<int>("delayTimeCRUs");
280+
const auto dropTFsRandom = config.options().get<int>("dropTFsRandom");
281+
const auto seed = config.options().get<int>("seed");
282+
const auto amplitude = config.options().get<float>("amplitude");
283+
const auto noise = config.options().get<float>("noise");
284+
285+
WorkflowSpec workflow;
286+
workflow.emplace_back(generateCMVsCRU(crus, delayCRUs, timeframes, delay, delayTime, delayTimeCRUs, dropTFsRandom, rangeTFsDrop, amplitude, noise, seed));
287+
return workflow;
288+
}

0 commit comments

Comments
 (0)