Skip to content

Commit f5ca8b3

Browse files
committed
[hist] Implement RHistEngine::SnapshotAtomic
It returns a consistent copy of the histogram, during concurrent filling. The implementation is based on a successful double collect as described by Afek et al in their 1993 paper "Atomic Snapshots of Shared Memory." The idea is to iterate twice over all bins until no change is observed. This is guaranteed to work for unweighted filling because bin contents are monotonically increasing. With weighted filling and potentially negative weights, there could be the situation where a bin goes back to its previous content and a change is missed. This is probably unlikely in practice, or can be entirely avoided by using RBinWithError that also tracks the sum of squares, which is monotonically increasing even for negative weights.
1 parent f802391 commit f5ca8b3

2 files changed

Lines changed: 96 additions & 0 deletions

File tree

hist/histv7/inc/ROOT/RHistEngine.hxx

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <cassert>
2121
#include <cstddef>
2222
#include <cstdint>
23+
#include <cstring>
2324
#include <stdexcept>
2425
#include <tuple>
2526
#include <type_traits>
@@ -811,6 +812,43 @@ public:
811812
return Slice(sliceSpecs);
812813
}
813814

815+
/// Create an atomic snapshot of this histogram engine.
816+
///
817+
/// A snapshot is a consistent copy of the histogram, during concurrent filling. It is guaranteed that the returned
818+
/// copy represents a state between the begin and end of the snapshot operation.
819+
///
820+
/// Snapshotting a histogram engine with many bins can be an expensive operation.
821+
///
822+
/// \return the atomic snapshot
823+
RHistEngine SnapshotAtomic() const
824+
{
825+
static_assert(std::is_trivially_copyable_v<BinContentType>,
826+
"snapshotting requires a trivially copyable bin content type");
827+
828+
RHistEngine snapshot(fAxes.Get());
829+
// Do a first collect.
830+
for (std::size_t i = 0; i < fBinContents.size(); i++) {
831+
Internal::AtomicLoad(&fBinContents[i], &snapshot.fBinContents[i]);
832+
}
833+
834+
// Now do another collect. If no change is detected, the snapshot is consistent. Otherwise update the bin contents
835+
// and try again.
836+
BinContentType tmp;
837+
bool changed;
838+
do {
839+
changed = false;
840+
for (std::size_t i = 0; i < fBinContents.size(); i++) {
841+
Internal::AtomicLoad(&fBinContents[i], &tmp);
842+
if (std::memcmp(&tmp, &snapshot.fBinContents[i], sizeof(BinContentType))) {
843+
std::memcpy(&snapshot.fBinContents[i], &tmp, sizeof(BinContentType));
844+
changed = true;
845+
}
846+
}
847+
} while (changed);
848+
849+
return snapshot;
850+
}
851+
814852
/// \}
815853

816854
/// %ROOT Streamer function to throw when trying to store an object of this class.

hist/histv7/test/hist_engine_atomic.cxx

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,64 @@ TEST(RHistEngine, StressFillAddAtomicWeight)
267267
EXPECT_EQ(engineA.GetBinContent(0), NOps * Weight);
268268
}
269269

270+
TEST(RHistEngine, SnapshotAtomic)
271+
{
272+
static constexpr std::size_t Bins = 20;
273+
const RRegularAxis axis(Bins, {0, Bins});
274+
RHistEngine<int> engineA({axis});
275+
276+
engineA.Fill(-100);
277+
for (std::size_t i = 0; i < Bins; i++) {
278+
engineA.Fill(i + 0.5);
279+
}
280+
engineA.Fill(100);
281+
282+
RHistEngine<int> engineB = engineA.SnapshotAtomic();
283+
ASSERT_EQ(engineB.GetNDimensions(), 1);
284+
ASSERT_EQ(engineB.GetTotalNBins(), Bins + 2);
285+
286+
EXPECT_EQ(engineB.GetBinContent(RBinIndex::Underflow()), 1);
287+
for (auto index : axis.GetNormalRange()) {
288+
EXPECT_EQ(engineB.GetBinContent(index), 1);
289+
}
290+
EXPECT_EQ(engineB.GetBinContent(RBinIndex::Overflow()), 1);
291+
}
292+
293+
TEST(RHistEngine, StressSnapshotAtomic)
294+
{
295+
static constexpr std::size_t Bins = 20;
296+
static constexpr std::size_t NThreads = 4;
297+
static constexpr std::size_t NFillsPerThread = 100000;
298+
static constexpr std::size_t NSnapshots = 10000;
299+
300+
// Create a histogram with some bins that takes a bit of time to snapshot. The idea of this stress test is then to
301+
// fill the first and last bin from multiple threads. If the snapshot is consistent, it must never have a bigger bin
302+
// content in the last bin.
303+
RHistEngine<int> engine(Bins, {0, Bins});
304+
int first = 0, last = 0;
305+
306+
std::atomic_flag snapshotter;
307+
StressInParallel(NThreads, [&] {
308+
if (!snapshotter.test_and_set()) {
309+
for (std::size_t i = 0; i < NSnapshots; i++) {
310+
RHistEngine<int> snapshot = engine.SnapshotAtomic();
311+
first = snapshot.GetBinContent(0);
312+
last = snapshot.GetBinContent(Bins - 1);
313+
if (last > first) {
314+
return;
315+
}
316+
}
317+
} else {
318+
for (std::size_t i = 0; i < NFillsPerThread; i++) {
319+
engine.FillAtomic(0.5);
320+
engine.FillAtomic(Bins - 0.5);
321+
}
322+
}
323+
});
324+
325+
EXPECT_GE(first, last);
326+
}
327+
270328
TEST(RHistEngine_RBinWithError, AddAtomic)
271329
{
272330
static constexpr std::size_t Bins = 20;

0 commit comments

Comments
 (0)