Skip to content

Commit bf9fe36

Browse files
committed
TPC: add test workflow to create dummy CMV data
1 parent d961ddf commit bf9fe36

2 files changed

Lines changed: 209 additions & 0 deletions

File tree

Detectors/TPC/workflow/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,11 @@ o2_add_executable(idc-test-ft
198198
SOURCES test/test_ft_EPN_Aggregator.cxx
199199
PUBLIC_LINK_LIBRARIES O2::TPCWorkflow)
200200

201+
o2_add_executable(cmv-test-generator
202+
COMPONENT_NAME tpc
203+
SOURCES test/test_cmv_generator.cxx
204+
PUBLIC_LINK_LIBRARIES O2::TPCWorkflow)
205+
201206
o2_add_executable(miptrack-filter
202207
COMPONENT_NAME tpc
203208
SOURCES src/tpc-miptrack-filter.cxx
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
/// \file test_cmv_generator.cxx
13+
/// \brief DPL source workflow that generates dummy CMV data for testing the CMV FLP pipeline.
14+
///
15+
/// Replaces o2-tpc-cmv-to-vector in tests; directly emits CMVVECTOR and CMVORBITS
16+
/// messages per CRU per TF so the workflow can be piped straight into o2-tpc-cmv-flp:
17+
///
18+
/// o2-tpc-cmv-test-generator --crus 0-359 --timeframes 100 \
19+
/// | o2-tpc-cmv-flp --crus 0-359 --n-TFs-buffer 10 \
20+
/// | o2-dpl-output-proxy --dataspec "downstream:TPC/CMVGROUP;downstream:TPC/CMVORBITINFO" ...
21+
///
22+
/// \author Ernst Hellbar <ernst.hellbar@cern.ch>
23+
24+
#include "Framework/DataProcessorSpec.h"
25+
#include "Framework/Task.h"
26+
#include "Framework/ControlService.h"
27+
#include "Framework/ConfigParamSpec.h"
28+
#include "Framework/Logger.h"
29+
#include "Headers/DataHeader.h"
30+
#include "Algorithm/RangeTokenizer.h"
31+
#include "TPCBase/CRU.h"
32+
#include "DataFormatsTPC/CMV.h"
33+
#include "TPCWorkflow/ProcessingHelpers.h"
34+
#include "TRandom.h"
35+
#include <fmt/format.h>
36+
#include <fmt/ranges.h>
37+
38+
#include <vector>
39+
#include <chrono>
40+
#include <thread>
41+
#include <cmath>
42+
#include <unordered_set>
43+
44+
using namespace o2::framework;
45+
using o2::header::gDataOriginTPC;
46+
47+
// ─────────────────────────────────────────────────────────────────────────────
48+
// workflow options
49+
// ─────────────────────────────────────────────────────────────────────────────
50+
void customize(std::vector<ConfigParamSpec>& workflowOptions)
51+
{
52+
const std::string cruDefault = "0-" + std::to_string(o2::tpc::CRU::MaxCRU - 1);
53+
std::vector<ConfigParamSpec> options{
54+
{"crus", VariantType::String, cruDefault.c_str(), {"List of CRUs, comma-separated ranges, e.g. 0-3,7,9-15"}},
55+
{"timeframes", VariantType::Int, 100, {"Number of TFs to generate"}},
56+
{"delay", VariantType::Bool, false, {"Add delay after sending all CRUs"}},
57+
{"delayTime", VariantType::Int, 1, {"Duration of the global per-TF delay in ms (requires --delay true)"}},
58+
{"delayCRUs", VariantType::String, "", {"CRUs for which to add an extra per-CRU delay before sending, comma-separated ranges"}},
59+
{"delayTimeCRUs", VariantType::Int, 1, {"Duration of the per-CRU delay in ms (requires --delayCRUs)"}},
60+
{"dropTFsRandom", VariantType::Int, 0, {"Drop a whole TF randomly: on average one every N TFs (0 = disabled)"}},
61+
{"dropTFsRange", VariantType::String, "", {"Drop all TFs in this range, e.g. 10-12"}},
62+
{"seed", VariantType::Int, 42, {"RNG seed for CMV value generation"}},
63+
{"amplitude", VariantType::Float, 5.0f, {"Amplitude of the sinusoidal CMV signal (ADC units)"}},
64+
{"noise", VariantType::Float, 1.0f, {"Gaussian noise std-dev added per time bin (ADC units)"}},
65+
};
66+
std::swap(workflowOptions, options);
67+
}
68+
69+
#include "Framework/runDataProcessing.h"
70+
71+
// ─────────────────────────────────────────────────────────────────────────────
72+
// generator device
73+
// ─────────────────────────────────────────────────────────────────────────────
74+
DataProcessorSpec generateCMVsCRU(const std::vector<uint32_t>& crus,
75+
const std::unordered_set<uint32_t>& delayCRUs,
76+
unsigned int maxTFs,
77+
bool delay,
78+
int delayTime,
79+
int delayTimeCRUs,
80+
int dropTFsRandom,
81+
const std::vector<int>& rangeTFsDrop,
82+
float amplitude,
83+
float noise)
84+
{
85+
using timer = std::chrono::high_resolution_clock;
86+
87+
std::vector<OutputSpec> outputSpecs;
88+
outputSpecs.reserve(crus.size() * 2);
89+
for (const auto cru : crus) {
90+
const o2::header::DataHeader::SubSpecificationType subSpec{cru << 7};
91+
outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, "CMVVECTOR", subSpec}, Lifetime::Timeframe);
92+
outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, "CMVORBITS", subSpec}, Lifetime::Timeframe);
93+
}
94+
95+
return DataProcessorSpec{
96+
"tpc-cmv-generator",
97+
Inputs{},
98+
outputSpecs,
99+
AlgorithmSpec{
100+
[maxTFs, delay, delayTime, delayTimeCRUs, dropTFsRandom, rangeTFsDrop, amplitude, noise, crus, delayCRUs](ProcessingContext& ctx) {
101+
const auto tf = o2::tpc::processing_helpers::getCurrentTF(ctx);
102+
103+
// ── TF dropping ──────────────────────────────────────────────────────
104+
if (!rangeTFsDrop.empty() && tf >= (uint32_t)rangeTFsDrop.front() && tf <= (uint32_t)rangeTFsDrop.back()) {
105+
LOGP(info, "Dropping TF {} (range drop)", tf);
106+
return;
107+
}
108+
if (dropTFsRandom > 0 && !gRandom->Integer(dropTFsRandom)) {
109+
LOGP(info, "Dropping TF {} (random drop)", tf);
110+
return;
111+
}
112+
113+
auto start = timer::now();
114+
115+
// FIXME: Slow sinusoidal baseline that drifts across TFs
116+
// this does NOT mimick a realistic common mode value distribution
117+
const float signal = amplitude * std::sin(tf * 0.05f);
118+
119+
for (const auto cru : crus) {
120+
const o2::header::DataHeader::SubSpecificationType subSpec{cru << 7};
121+
122+
// ── per-CRU delay ────────────────────────────────────────────────
123+
if (delayCRUs.count(cru)) {
124+
std::this_thread::sleep_for(std::chrono::milliseconds(delayTimeCRUs));
125+
}
126+
127+
// ── CMV values ──────────────────────────────────────────────────
128+
// NTimeBinsPerTF = NPacketsPerTFPerCRU (4) * NTimeBinsPerPacket (3564) = 14256
129+
std::vector<uint16_t> cmvVec;
130+
cmvVec.reserve(o2::tpc::cmv::NTimeBinsPerTF);
131+
for (uint32_t tb = 0; tb < o2::tpc::cmv::NTimeBinsPerTF; ++tb) {
132+
const float val = signal + noise * gRandom->Gaus(0, 1);
133+
o2::tpc::cmv::Data d;
134+
d.setCMVFloat(val);
135+
cmvVec.push_back(d.getCMV());
136+
}
137+
138+
// ── Orbit / BC info ─────────────────────────────────────────────
139+
// One packed (orbit<<32|bc) entry per CMV packet (4 per TF).
140+
// Each packet covers 8 heartbeat orbits (NTimeBinsPerPacket = 3564 = 8 LHC orbits),
141+
// so the orbit advances by 8 per packet and by NPacketsPerTFPerCRU*8 = 32 per TF.
142+
static constexpr uint32_t sOrbitsPerPacket = 8;
143+
std::vector<uint64_t> orbitBCVec;
144+
orbitBCVec.reserve(o2::tpc::cmv::NPacketsPerTFPerCRU);
145+
for (uint32_t pkt = 0; pkt < o2::tpc::cmv::NPacketsPerTFPerCRU; ++pkt) {
146+
const uint32_t orbit = static_cast<uint32_t>(tf * o2::tpc::cmv::NPacketsPerTFPerCRU * sOrbitsPerPacket + pkt * sOrbitsPerPacket);
147+
orbitBCVec.push_back(uint64_t(orbit) << 32); // bc = 0
148+
}
149+
150+
ctx.outputs().snapshot(Output{gDataOriginTPC, "CMVVECTOR", subSpec}, cmvVec);
151+
ctx.outputs().snapshot(Output{gDataOriginTPC, "CMVORBITS", subSpec}, orbitBCVec);
152+
}
153+
154+
// ── global delay ─────────────────────────────────────────────────────
155+
if (delay) {
156+
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(timer::now() - start).count();
157+
if (elapsed < delayTime) {
158+
std::this_thread::sleep_for(std::chrono::milliseconds(delayTime - elapsed));
159+
}
160+
}
161+
162+
if (!(tf % 100)) {
163+
LOGP(info, "Generated CMV data for TF {}", tf);
164+
}
165+
166+
if (tf >= maxTFs - 1) {
167+
ctx.services().get<ControlService>().readyToQuit(QuitRequest::Me);
168+
}
169+
}}};
170+
}
171+
172+
// ─────────────────────────────────────────────────────────────────────────────
173+
WorkflowSpec defineDataProcessing(ConfigContext const& config)
174+
{
175+
const auto tpcCRUs = o2::RangeTokenizer::tokenize<int>(config.options().get<std::string>("crus"));
176+
const std::vector<uint32_t> crus(tpcCRUs.begin(), tpcCRUs.end());
177+
LOGP(info, "crus: {}", fmt::join(crus, ", "));
178+
179+
const auto delayCRUsStr = config.options().get<std::string>("delayCRUs");
180+
std::unordered_set<uint32_t> delayCRUs;
181+
if (!delayCRUsStr.empty()) {
182+
for (const auto cru : o2::RangeTokenizer::tokenize<int>(delayCRUsStr)) {
183+
delayCRUs.insert(static_cast<uint32_t>(cru));
184+
}
185+
}
186+
const std::vector<uint32_t> delayCRUsSorted(delayCRUs.begin(), delayCRUs.end());
187+
LOGP(info, "delayCRUs: {}", fmt::join(delayCRUsSorted, ", "));
188+
189+
const auto rangeTFsDrop = o2::RangeTokenizer::tokenize<int>(config.options().get<std::string>("dropTFsRange"));
190+
const auto timeframes = static_cast<unsigned int>(config.options().get<int>("timeframes"));
191+
const auto delay = config.options().get<bool>("delay");
192+
const auto delayTime = config.options().get<int>("delayTime");
193+
const auto delayTimeCRUs = config.options().get<int>("delayTimeCRUs");
194+
const auto dropTFsRandom = config.options().get<int>("dropTFsRandom");
195+
const auto seed = config.options().get<int>("seed");
196+
const auto amplitude = config.options().get<float>("amplitude");
197+
const auto noise = config.options().get<float>("noise");
198+
199+
gRandom->SetSeed(seed);
200+
201+
WorkflowSpec workflow;
202+
workflow.emplace_back(generateCMVsCRU(crus, delayCRUs, timeframes, delay, delayTime, delayTimeCRUs, dropTFsRandom, rangeTFsDrop, amplitude, noise));
203+
return workflow;
204+
}

0 commit comments

Comments
 (0)