|
19 | 19 | #include <iostream> |
20 | 20 | #include <iterator> |
21 | 21 | #include <limits> |
| 22 | +#include <mutex> |
| 23 | +#include <thread> |
| 24 | +#include <queue> |
22 | 25 |
|
23 | 26 | namespace o2::mch::calibration |
24 | 27 | { |
@@ -69,45 +72,107 @@ PedestalData::PedestalMatrix PedestalData::initPedestalMatrix(uint16_t solarId) |
69 | 72 | void PedestalData::fill(gsl::span<const PedestalDigit> digits) |
70 | 73 | { |
71 | 74 | bool mDebug = false; |
| 75 | + static std::mutex pedestalMutex; |
| 76 | + static std::set<uint16_t> solarIds = o2::mch::raw::getSolarUIDs<o2::mch::raw::ElectronicMapperGenerated>(); |
72 | 77 |
|
73 | | - for (auto& d : digits) { |
74 | | - uint16_t solarId = d.getSolarId(); |
75 | | - uint8_t dsId = d.getDsId(); |
76 | | - uint8_t channel = d.getChannel(); |
| 78 | + if (digits.empty()) { |
| 79 | + return; |
| 80 | + } |
77 | 81 |
|
78 | | - auto iPedestal = mPedestals.find(solarId); |
| 82 | + LOGP(info, "processing {} digits with {} threads", (int)digits.size(), mNThreads); |
79 | 83 |
|
80 | | - if (iPedestal == mPedestals.end()) { |
81 | | - auto iPedestalsNew = mPedestals.emplace(std::make_pair(solarId, initPedestalMatrix(solarId))); |
82 | | - iPedestal = iPedestalsNew.first; |
83 | | - } |
| 84 | + // fill the queue of SOLAR IDs to be processed |
| 85 | + std::queue<uint16_t> solarQueue; |
| 86 | + for (auto solarId : solarIds) { |
| 87 | + solarQueue.push(solarId); |
| 88 | + } |
84 | 89 |
|
85 | | - if (iPedestal == mPedestals.end()) { |
86 | | - LOGP(fatal, "failed to insert new element in padestals map"); |
87 | | - break; |
88 | | - } |
| 90 | + auto processSolarDigits = [&]() { |
| 91 | + while (true) { |
| 92 | + int targetSolarId = -1; |
| 93 | + PedestalsMap::iterator iPedestal; |
| 94 | + bool pedestalsAreInitialized; |
| 95 | + |
| 96 | + // non thread-safe access to solarQueue, protected by the pedestalMutex |
| 97 | + { |
| 98 | + std::lock_guard<std::mutex> lock(pedestalMutex); |
| 99 | + |
| 100 | + // stop when there are no mor SOLAR IDs to process |
| 101 | + if (solarQueue.empty()) { |
| 102 | + break; |
| 103 | + } |
| 104 | + |
| 105 | + // get the next SOLAR ID to be processed |
| 106 | + targetSolarId = solarQueue.front(); |
| 107 | + solarQueue.pop(); |
| 108 | + |
| 109 | + // update the iterator to the pedestal data for the target SOLAR |
| 110 | + iPedestal = mPedestals.find(targetSolarId); |
| 111 | + if (iPedestal == mPedestals.end()) { |
| 112 | + pedestalsAreInitialized = false; |
| 113 | + } else { |
| 114 | + pedestalsAreInitialized = true; |
| 115 | + } |
| 116 | + } |
89 | 117 |
|
90 | | - auto& ped = iPedestal->second[dsId][channel]; |
| 118 | + // loop over digits, selecting only those belonging to the target SOLAR |
| 119 | + for (auto& d : digits) { |
| 120 | + uint16_t solarId = d.getSolarId(); |
| 121 | + if (solarId != targetSolarId) { |
| 122 | + continue; |
| 123 | + } |
91 | 124 |
|
92 | | - for (uint16_t i = 0; i < d.nofSamples(); i++) { |
93 | | - auto s = d.getSample(i); |
| 125 | + // non thread-safe access to Pedestals structure, protected by the pedestalMutex |
| 126 | + if (!pedestalsAreInitialized) { |
| 127 | + std::lock_guard<std::mutex> lock(pedestalMutex); |
94 | 128 |
|
95 | | - ped.mEntries += 1; |
96 | | - uint64_t N = ped.mEntries; |
| 129 | + // create the pedestals structure corresponding to the SOLAR ID to be processed |
| 130 | + iPedestal = mPedestals.emplace(std::make_pair(targetSolarId, initPedestalMatrix(targetSolarId))).first; |
97 | 131 |
|
98 | | - double p0 = ped.mPedestal; |
99 | | - double p = p0 + (s - p0) / N; |
100 | | - ped.mPedestal = p; |
| 132 | + if (iPedestal == mPedestals.end()) { |
| 133 | + LOGP(fatal, "failed to insert new element in padestals map"); |
| 134 | + break; |
| 135 | + } |
| 136 | + pedestalsAreInitialized = true; |
| 137 | + } |
101 | 138 |
|
102 | | - double M0 = ped.mVariance; |
103 | | - double M = M0 + (s - p0) * (s - p); |
104 | | - ped.mVariance = M; |
105 | | - } |
| 139 | + uint8_t dsId = d.getDsId(); |
| 140 | + uint8_t channel = d.getChannel(); |
| 141 | + |
| 142 | + auto& ped = iPedestal->second[dsId][channel]; |
| 143 | + |
| 144 | + for (uint16_t i = 0; i < d.nofSamples(); i++) { |
| 145 | + auto s = d.getSample(i); |
106 | 146 |
|
107 | | - if (mDebug) { |
108 | | - LOGP(info, "solarId {} dsId {} ch {} nsamples {} entries{} mean {} variance {}", |
109 | | - (int)solarId, (int)dsId, (int)channel, d.nofSamples(), ped.mEntries, ped.mPedestal, ped.mVariance); |
| 147 | + ped.mEntries += 1; |
| 148 | + uint64_t N = ped.mEntries; |
| 149 | + |
| 150 | + double p0 = ped.mPedestal; |
| 151 | + double p = p0 + (s - p0) / N; |
| 152 | + ped.mPedestal = p; |
| 153 | + |
| 154 | + double M0 = ped.mVariance; |
| 155 | + double M = M0 + (s - p0) * (s - p); |
| 156 | + ped.mVariance = M; |
| 157 | + } |
| 158 | + |
| 159 | + if (mDebug) { |
| 160 | + LOGP(info, "solarId {} dsId {} ch {} nsamples {} entries{} mean {} variance {}", |
| 161 | + (int)solarId, (int)dsId, (int)channel, d.nofSamples(), ped.mEntries, ped.mPedestal, ped.mVariance); |
| 162 | + } |
| 163 | + } |
110 | 164 | } |
| 165 | + }; |
| 166 | + |
| 167 | + // process the digits in parallel threads |
| 168 | + std::vector<std::thread> threads; |
| 169 | + for (int ti = 0; ti < mNThreads; ti++) { |
| 170 | + threads.emplace_back(processSolarDigits); |
| 171 | + } |
| 172 | + |
| 173 | + // wait for all threads to finish processing |
| 174 | + for (auto& thread : threads) { |
| 175 | + thread.join(); |
111 | 176 | } |
112 | 177 | } |
113 | 178 |
|
|
0 commit comments