diff --git a/roottest/root/dataframe/testIMT.cxx b/roottest/root/dataframe/testIMT.cxx index 74648ac7bff1b..9fa1d23b5adec 100644 --- a/roottest/root/dataframe/testIMT.cxx +++ b/roottest/root/dataframe/testIMT.cxx @@ -55,18 +55,18 @@ void getTracks(unsigned int mu, FourVectors& tracks) { // This makes the example stand-alone void FillTree(const char* filename, const char* treeName) { if (!gSystem->AccessPathName(filename)) return; - TFile f(filename,"RECREATE"); - TTree t(treeName,treeName); + auto f = std::make_unique(filename, "RECREATE"); + auto t = std::make_unique(treeName, treeName); double b1; int b2; std::vector tracks; std::vector dv {-1,2,3,4}; std::list sl {1,2,3,4}; - t.Branch("b1", &b1); - t.Branch("b2", &b2); - t.Branch("tracks", &tracks); - t.Branch("dv", &dv); - t.Branch("sl", &sl); + t->Branch("b1", &b1); + t->Branch("b2", &b2); + t->Branch("tracks", &tracks); + t->Branch("dv", &dv); + t->Branch("sl", &sl); int nevts = 16000; for(int i = 0; i < nevts; ++i) { @@ -77,11 +77,9 @@ void FillTree(const char* filename, const char* treeName) { dv.emplace_back(i); sl.emplace_back(i); - t.Fill(); + t->Fill(); } - t.Write(); - f.Close(); - return; + f->Write(); } auto fileName = "testIMT.root"; diff --git a/tree/dataframe/CMakeLists.txt b/tree/dataframe/CMakeLists.txt index 01ddbc3e3774e..abb05a3714f98 100644 --- a/tree/dataframe/CMakeLists.txt +++ b/tree/dataframe/CMakeLists.txt @@ -82,6 +82,7 @@ ROOT_STANDARD_LIBRARY_PACKAGE(ROOTDataFrame ROOT/RDF/RJittedVariation.hxx ROOT/RDF/RLazyDSImpl.hxx ROOT/RDF/RLoopManager.hxx + ROOT/RDF/RMaskedEntryRange.hxx ROOT/RDF/RMergeableValue.hxx ROOT/RDF/RMetaData.hxx ROOT/RDF/RNodeBase.hxx diff --git a/tree/dataframe/inc/ROOT/RCsvDS.hxx b/tree/dataframe/inc/ROOT/RCsvDS.hxx index 89bf6188edacc..592aa94eeba8b 100644 --- a/tree/dataframe/inc/ROOT/RCsvDS.hxx +++ b/tree/dataframe/inc/ROOT/RCsvDS.hxx @@ -27,7 +27,8 @@ namespace ROOT::Internal::RDF { class R__CLING_PTRCHECK(off) RCsvDSColumnReader final : public ROOT::Detail::RDF::RColumnReaderBase { void *fValuePtr; - void *GetImpl(Long64_t) final { return fValuePtr; } + void *GetImpl(std::size_t) final { return fValuePtr; } + void LoadImpl(const ROOT::Internal::RDF::RMaskedEntryRange &) final {} public: RCsvDSColumnReader(void *valuePtr) : fValuePtr(valuePtr) {} diff --git a/tree/dataframe/inc/ROOT/RDF/RAction.hxx b/tree/dataframe/inc/ROOT/RDF/RAction.hxx index 2a9e512f65633..1568313a967f8 100644 --- a/tree/dataframe/inc/ROOT/RDF/RAction.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RAction.hxx @@ -99,31 +99,34 @@ public: } template - auto GetValueChecked(unsigned int slot, std::size_t readerIdx, Long64_t entry) -> ColType & + auto GetValueChecked(unsigned int slot, std::size_t readerIdx, std::size_t idx) -> ColType & { - if (auto *val = fValues[slot][readerIdx]->template TryGet(entry)) + if (auto *val = fValues[slot][readerIdx]->template TryGet(idx)) return *val; throw std::out_of_range{"RDataFrame: Action (" + fHelper.GetActionName() + ") could not retrieve value for column '" + fColumnNames[readerIdx] + "' for entry " + - std::to_string(entry) + + std::to_string(idx) + ". You can use the DefaultValueFor operation to provide a default value, or " "FilterAvailable/FilterMissing to discard/keep entries with missing values instead."}; } template - void CallExec(unsigned int slot, Long64_t entry, TypeList, std::index_sequence) + void CallExec(unsigned int slot, std::size_t idx, TypeList, std::index_sequence) { ROOT::Internal::RDF::CallGuaranteedOrder{[&](auto &&...args) { return fHelper.Exec(slot, args...); }, - GetValueChecked(slot, S, entry)...}; - (void)entry; // avoid unused parameter warning (gcc 12.1) + GetValueChecked(slot, S, idx)...}; + (void)idx; // avoid unused parameter warning (gcc 12.1) } - void Run(unsigned int slot, Long64_t entry) final + void Run(unsigned int slot, Long64_t bulkBeginEntry, std::size_t bulkSize) final { - // check if entry passes all filters - if (fPrevNode.CheckFilters(slot, entry)) - CallExec(slot, entry, ColumnTypes_t{}, TypeInd_t{}); + const auto mask = fPrevNode.CheckFilters(slot, bulkBeginEntry, bulkSize); + std::for_each(fValues[slot].begin(), fValues[slot].end(), [&mask](auto *v) { v->Load(mask); }); + + // Assume 1-size bulk for now + if (mask[0]) + CallExec(slot, /*idx=*/0u, ColumnTypes_t{}, TypeInd_t{}); } void TriggerChildrenCount() final { fPrevNode.IncrChildrenCount(); } diff --git a/tree/dataframe/inc/ROOT/RDF/RActionBase.hxx b/tree/dataframe/inc/ROOT/RDF/RActionBase.hxx index ff765a7ca69cd..a95ba5a8734f6 100644 --- a/tree/dataframe/inc/ROOT/RDF/RActionBase.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RActionBase.hxx @@ -63,7 +63,6 @@ public: RColumnRegister &GetColRegister() { return fColRegister; } RLoopManager *GetLoopManager() { return fLoopManager; } unsigned int GetNSlots() const { return fNSlots; } - virtual void Run(unsigned int slot, Long64_t entry) = 0; virtual void Initialize() = 0; virtual void InitSlot(TTreeReader *r, unsigned int slot) = 0; virtual void TriggerChildrenCount() = 0; @@ -92,6 +91,8 @@ public: virtual std::unique_ptr MakeVariedAction(std::vector &&results) = 0; virtual std::unique_ptr CloneAction(void *newResult) = 0; + + virtual void Run(unsigned int slot, Long64_t bulkBeginEntry, std::size_t bulkSize) = 0; }; } // namespace RDF } // namespace Internal diff --git a/tree/dataframe/inc/ROOT/RDF/RActionSnapshot.hxx b/tree/dataframe/inc/ROOT/RDF/RActionSnapshot.hxx index 510b8b5ec273b..1c3e9a7a87e34 100644 --- a/tree/dataframe/inc/ROOT/RDF/RActionSnapshot.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RActionSnapshot.hxx @@ -166,55 +166,62 @@ public: fHelper.InitTask(r, slot); } - void *GetValue(unsigned int slot, std::size_t readerIdx, Long64_t entry) + void *GetValue(unsigned int slot, std::size_t readerIdx, std::size_t idx) { assert(slot < fValues.size()); assert(readerIdx < fValues[slot].size()); - if (auto *val = fValues[slot][readerIdx]->template TryGet(entry)) + if (auto *val = fValues[slot][readerIdx]->template TryGet(idx)) return val; throw std::out_of_range{"RDataFrame: Action (" + fHelper.GetActionName() + ") could not retrieve value for column '" + fColumnNames[readerIdx] + "' for entry " + - std::to_string(entry) + + std::to_string(idx) + ". You can use the DefaultValueFor operation to provide a default value, or " "FilterAvailable/FilterMissing to discard/keep entries with missing values instead."}; } - void CallExec(unsigned int slot, Long64_t entry) + void CallExec(unsigned int slot, std::size_t idx) { std::vector untypedValues; auto nReaders = fValues[slot].size(); untypedValues.reserve(nReaders); for (decltype(nReaders) readerIdx{}; readerIdx < nReaders; readerIdx++) - untypedValues.push_back(GetValue(slot, readerIdx, entry)); + untypedValues.push_back(GetValue(slot, readerIdx, idx)); fHelper.Exec(slot, untypedValues); } - void Run(unsigned int slot, Long64_t entry) final + void Run(unsigned int slot, Long64_t bulkBeginEntry, std::size_t bulkSize) final { if constexpr (std::is_same_v) { // check if entry passes all filters - std::vector filterPassed(fPrevNodes.size(), false); + std::vector filterPassed(fPrevNodes.size(), 1ul); for (unsigned int variation = 0; variation < fPrevNodes.size(); ++variation) { - filterPassed[variation] = fPrevNodes[variation]->CheckFilters(slot, entry); + filterPassed[variation] = fPrevNodes[variation]->CheckFilters(slot, bulkBeginEntry, bulkSize); } // Currently, every event where any of nominal or variations pass gets written to the output. // This logic could be extended for different use cases if the need arises. - if (std::any_of(filterPassed.begin(), filterPassed.end(), [](bool val) { return val; })) { + if (std::any_of(filterPassed.begin(), filterPassed.end(), + [](const ROOT::Internal::RDF::RMaskedEntryRange &val) { return val[0]; })) { // TODO: Don't allocate std::vector untypedValues; auto nReaders = fValues[slot].size(); untypedValues.reserve(nReaders); + std::for_each(fValues[slot].begin(), fValues[slot].end(), [bulkBeginEntry, bulkSize](auto *v) { + v->Load( + ROOT::Internal::RDF::RMaskedEntryRange{bulkSize, true, static_cast(bulkBeginEntry)}); + }); for (decltype(nReaders) readerIdx{}; readerIdx < nReaders; readerIdx++) - untypedValues.push_back(GetValue(slot, readerIdx, entry)); + untypedValues.push_back(GetValue(slot, readerIdx, /*idx=*/0u)); fHelper.Exec(slot, untypedValues, filterPassed); } } else { - if (fPrevNodes.front()->CheckFilters(slot, entry)) - CallExec(slot, entry); + const auto mask = fPrevNodes.front()->CheckFilters(slot, bulkBeginEntry, bulkSize); + std::for_each(fValues[slot].begin(), fValues[slot].end(), [&mask](auto *v) { v->Load(mask); }); + if (mask[0]) + CallExec(slot, /*idx=*/0u); } } diff --git a/tree/dataframe/inc/ROOT/RDF/RColumnReaderBase.hxx b/tree/dataframe/inc/ROOT/RDF/RColumnReaderBase.hxx index 29aeee4f306e7..be5f8a6847f55 100644 --- a/tree/dataframe/inc/ROOT/RDF/RColumnReaderBase.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RColumnReaderBase.hxx @@ -12,6 +12,7 @@ #define ROOT_INTERNAL_RDF_RCOLUMNREADERBASE #include +#include namespace ROOT { namespace Detail { @@ -26,9 +27,15 @@ This pure virtual class provides a common base class for the different column re RDSColumnReader. **/ class R__CLING_PTRCHECK(off) RColumnReaderBase { + public: virtual ~RColumnReaderBase() = default; + /// Load the column value for the given entry. + /// \param entry The entry number to load. + /// \param mask The entry mask. Values will be loaded only for entries for which the mask equals true. + void Load(const ROOT::Internal::RDF::RMaskedEntryRange &mask) { LoadImpl(mask); } + /// Return the column value for the given entry. /// \tparam T The column type /// \param entry The entry number @@ -36,13 +43,14 @@ public: /// The caller is responsible for checking that the returned value actually /// exists. template - T *TryGet(Long64_t entry) + T *TryGet(std::size_t idx) { - return static_cast(GetImpl(entry)); + return static_cast(GetImpl(idx)); } private: - virtual void *GetImpl(Long64_t entry) = 0; + virtual void *GetImpl(std::size_t idx) = 0; + virtual void LoadImpl(const ROOT::Internal::RDF::RMaskedEntryRange &) = 0; }; } // namespace RDF diff --git a/tree/dataframe/inc/ROOT/RDF/RDSColumnReader.hxx b/tree/dataframe/inc/ROOT/RDF/RDSColumnReader.hxx index 2e317573102b5..fe741579a6250 100644 --- a/tree/dataframe/inc/ROOT/RDF/RDSColumnReader.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RDSColumnReader.hxx @@ -23,7 +23,8 @@ template class R__CLING_PTRCHECK(off) RDSColumnReader final : public ROOT::Detail::RDF::RColumnReaderBase { T **fDSValuePtr = nullptr; - void *GetImpl(Long64_t) final { return *fDSValuePtr; } + void *GetImpl(std::size_t) final { return *fDSValuePtr; } + void LoadImpl(const ROOT::Internal::RDF::RMaskedEntryRange &) final {} public: RDSColumnReader(void *DSValuePtr) : fDSValuePtr(static_cast(DSValuePtr)) {} diff --git a/tree/dataframe/inc/ROOT/RDF/RDefaultValueFor.hxx b/tree/dataframe/inc/ROOT/RDF/RDefaultValueFor.hxx index a03af81745a1b..f8c1dc8ef0c87 100644 --- a/tree/dataframe/inc/ROOT/RDF/RDefaultValueFor.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RDefaultValueFor.hxx @@ -45,11 +45,12 @@ template class R__CLING_PTRCHECK(off) RDefaultValueFor final : public RDefineBase { using ColumnTypes_t = ROOT::TypeTraits::TypeList; using TypeInd_t = std::make_index_sequence; - // Avoid instantiating vector as `operator[]` returns temporaries in that case. Use std::deque instead. - using ValuesPerSlot_t = std::conditional_t::value, std::deque, std::vector>; + + using ValuesPerSlot_t = std::vector>; T fDefaultValue; - ValuesPerSlot_t fLastResults; + // Each slot accesses a cache of values for the current bulk + ValuesPerSlot_t fCachedResultsPerSlot; // One column reader per slot std::vector fValues; @@ -57,9 +58,9 @@ class R__CLING_PTRCHECK(off) RDefaultValueFor final : public RDefineBase { /// The map key is the full variation name, e.g. "pt:up". std::unordered_map> fVariedDefines; - T &GetValueOrDefault(unsigned int slot, Long64_t entry) + T &GetValueOrDefault(unsigned int slot, std::size_t idx) { - if (auto *value = fValues[slot]->template TryGet(entry)) + if (auto *value = fValues[slot]->template TryGet(idx)) return *value; else return fDefaultValue; @@ -71,12 +72,16 @@ public: RLoopManager &lm, const std::string &variationName = "nominal") : RDefineBase(name, type, colRegister, lm, columns, variationName), fDefaultValue(defaultValue), - fLastResults(lm.GetNSlots() * RDFInternal::CacheLineStep()), + fCachedResultsPerSlot(lm.GetNSlots() * RDFInternal::CacheLineStep>()), fValues(lm.GetNSlots()) { fLoopManager->Register(this); // We suppress errors that TTreeReader prints regarding the missing branch fLoopManager->InsertSuppressErrorsForMissingBranch(fColumnNames[0]); + // Assume 1-size bulk for now + for (decltype(lm.GetNSlots()) i = 0; i < lm.GetNSlots(); ++i) { + fCachedResultsPerSlot[i * RDFInternal::CacheLineStep>()].resize(1ul); + } } RDefaultValueFor(const RDefaultValueFor &) = delete; @@ -97,20 +102,30 @@ public: fLastCheckedEntry[slot * RDFInternal::CacheLineStep()] = -1; } - /// Return the (type-erased) address of the Define'd value for the given processing slot. + /// Return the beginning of the cached results of the current bulk for the input processing slot void *GetValuePtr(unsigned int slot) final { - return static_cast(&fLastResults[slot * RDFInternal::CacheLineStep()]); + return static_cast(fCachedResultsPerSlot[slot * RDFInternal::CacheLineStep>()].data()); } /// Update the value at the address returned by GetValuePtr with the content corresponding to the given entry - void Update(unsigned int slot, Long64_t entry) final + void Update(unsigned int slot, const ROOT::Internal::RDF::RMaskedEntryRange &mask) final { - if (entry != fLastCheckedEntry[slot * RDFInternal::CacheLineStep()]) { - // evaluate this define expression, cache the result - fLastResults[slot * RDFInternal::CacheLineStep()] = GetValueOrDefault(slot, entry); - fLastCheckedEntry[slot * RDFInternal::CacheLineStep()] = entry; + if (static_cast(mask.GetFirstEntry()) == + fLastCheckedEntry[slot * RDFInternal::CacheLineStep()]) + return; + + // Assume 1-size bulk for now + fValues[slot]->Load(mask); + const std::size_t bulkSize = fLoopManager->GetCurrentBulkSize(); + auto &result = fCachedResultsPerSlot[slot * RDFInternal::CacheLineStep>()]; + result.clear(); + result.resize(bulkSize); + for (std::size_t i = 0; i < bulkSize; ++i) { + if (mask[i]) + fCachedResultsPerSlot[slot * RDFInternal::CacheLineStep>()][i] = GetValueOrDefault(slot, i); } + fLastCheckedEntry[slot * RDFInternal::CacheLineStep()] = mask.GetFirstEntry(); } void Update(unsigned int /*slot*/, const ROOT::RDF::RSampleInfo & /*id*/) final {} diff --git a/tree/dataframe/inc/ROOT/RDF/RDefine.hxx b/tree/dataframe/inc/ROOT/RDF/RDefine.hxx index 8c9a57351079a..30f64e09a96e9 100644 --- a/tree/dataframe/inc/ROOT/RDF/RDefine.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RDefine.hxx @@ -56,12 +56,11 @@ class R__CLING_PTRCHECK(off) RDefine final : public RDefineBase { RDFInternal::RemoveFirstTwoParametersIf_t::value, ColumnTypesTmp_t>; using TypeInd_t = std::make_index_sequence; using ret_type = typename CallableTraits::ret_type; - // Avoid instantiating vector as `operator[]` returns temporaries in that case. Use std::deque instead. - using ValuesPerSlot_t = - std::conditional_t::value, std::deque, std::vector>; + using ValuesPerSlot_t = std::vector>; F fExpression; - ValuesPerSlot_t fLastResults; + // Each slot accesses a cache of values for the current bulk + ValuesPerSlot_t fCachedResultsPerSlot; /// Column readers per slot and per input column std::vector> fValues; @@ -71,49 +70,59 @@ class R__CLING_PTRCHECK(off) RDefine final : public RDefineBase { std::unordered_map> fVariedDefines; template - auto GetValueChecked(unsigned int slot, std::size_t readerIdx, Long64_t entry) -> ColType & + auto GetValueChecked(unsigned int slot, std::size_t readerIdx, std::size_t idx) -> ColType & { - if (auto *val = fValues[slot][readerIdx]->template TryGet(entry)) + if (auto *val = fValues[slot][readerIdx]->template TryGet(idx)) return *val; throw std::out_of_range{"RDataFrame: Define could not retrieve value for column '" + fColumnNames[readerIdx] + - "' for entry " + std::to_string(entry) + + "' for entry " + std::to_string(idx) + ". You can use the DefaultValueFor operation to provide a default value, or " "FilterAvailable/FilterMissing to discard/keep entries with missing values instead."}; } template - void UpdateHelper(unsigned int slot, Long64_t entry, TypeList, std::index_sequence, NoneTag) + auto UpdateHelper(unsigned int slot, std::size_t idx, Long64_t /*entry*/, TypeList, + std::index_sequence, NoneTag) { - fLastResults[slot * RDFInternal::CacheLineStep()] = - fExpression(GetValueChecked(slot, S, entry)...); - (void)entry; // avoid unused parameter warning (gcc 12.1) + return fExpression(GetValueChecked(slot, S, idx)...); + (void)slot; // avoid unused parameter warning + (void)idx; // avoid unused parameter warning } template - void UpdateHelper(unsigned int slot, Long64_t entry, TypeList, std::index_sequence, SlotTag) + auto UpdateHelper(unsigned int slot, std::size_t idx, Long64_t /*entry*/, TypeList, + std::index_sequence, SlotTag) { - fLastResults[slot * RDFInternal::CacheLineStep()] = - fExpression(slot, GetValueChecked(slot, S, entry)...); - (void)entry; // avoid unused parameter warning (gcc 12.1) + return fExpression(slot, GetValueChecked(slot, S, idx)...); + (void)slot; // avoid unused parameter warning + (void)idx; // avoid unused parameter warning } template - void - UpdateHelper(unsigned int slot, Long64_t entry, TypeList, std::index_sequence, SlotAndEntryTag) + auto UpdateHelper(unsigned int slot, std::size_t idx, Long64_t entryInBatch, TypeList, + std::index_sequence, SlotAndEntryTag) { - fLastResults[slot * RDFInternal::CacheLineStep()] = - fExpression(slot, entry, GetValueChecked(slot, S, entry)...); + return fExpression(slot, entryInBatch, GetValueChecked(slot, S, idx)...); + (void)slot; // avoid unused parameter warning + (void)idx; // avoid unused parameter warning + (void)entryInBatch; // avoid unused parameter warning } public: RDefine(std::string_view name, std::string_view type, F expression, const ROOT::RDF::ColumnNames_t &columns, const RDFInternal::RColumnRegister &colRegister, RLoopManager &lm, const std::string &variationName = "nominal") - : RDefineBase(name, type, colRegister, lm, columns, variationName), fExpression(std::move(expression)), - fLastResults(lm.GetNSlots() * RDFInternal::CacheLineStep()), fValues(lm.GetNSlots()) + : RDefineBase(name, type, colRegister, lm, columns, variationName), + fExpression(std::move(expression)), + fCachedResultsPerSlot(lm.GetNSlots() * RDFInternal::CacheLineStep>()), + fValues(lm.GetNSlots()) { fLoopManager->Register(this); + // Assume 1-size bulk for now + for (decltype(lm.GetNSlots()) i = 0; i < lm.GetNSlots(); ++i) { + fCachedResultsPerSlot[i * RDFInternal::CacheLineStep>()].resize(1ul); + } } RDefine(const RDefine &) = delete; @@ -127,19 +136,31 @@ public: fLastCheckedEntry[slot * RDFInternal::CacheLineStep()] = -1; } - /// Return the (type-erased) address of the Define'd value for the given processing slot. + /// Return the beginning of the cached results of the current bulk for the input processing slot void *GetValuePtr(unsigned int slot) final { - return static_cast(&fLastResults[slot * RDFInternal::CacheLineStep()]); + return static_cast( + fCachedResultsPerSlot[slot * RDFInternal::CacheLineStep>()].data()); } - /// Update the value at the address returned by GetValuePtr with the content corresponding to the given entry - void Update(unsigned int slot, Long64_t entry) final + /// Update the values at the array returned by GetValuePtr with the content corresponding to the given mask + void Update(unsigned int slot, const ROOT::Internal::RDF::RMaskedEntryRange &mask) final { - if (entry != fLastCheckedEntry[slot * RDFInternal::CacheLineStep()]) { - // evaluate this define expression, cache the result - UpdateHelper(slot, entry, ColumnTypes_t{}, TypeInd_t{}, ExtraArgsTag{}); - fLastCheckedEntry[slot * RDFInternal::CacheLineStep()] = entry; + if (static_cast(mask.GetFirstEntry()) == + fLastCheckedEntry[slot * RDFInternal::CacheLineStep()]) + return; + + std::for_each(fValues[slot].begin(), fValues[slot].end(), [&mask](auto *v) { v->Load(mask); }); + // Assume 1-size bulk for now + const std::size_t bulkSize = 1; + auto &result = fCachedResultsPerSlot[slot * RDFInternal::CacheLineStep>()]; + result.clear(); + result.resize(bulkSize); + for (std::size_t i = 0; i < bulkSize; ++i) { + if (mask[i]) { + result[i] = UpdateHelper(slot, i, mask.GetFirstEntry() + i, ColumnTypes_t{}, TypeInd_t{}, ExtraArgsTag{}); + fLastCheckedEntry[slot * RDFInternal::CacheLineStep()] = mask.GetFirstEntry(); + } } } diff --git a/tree/dataframe/inc/ROOT/RDF/RDefineBase.hxx b/tree/dataframe/inc/ROOT/RDF/RDefineBase.hxx index f20b77e0b3286..730552a5c762b 100644 --- a/tree/dataframe/inc/ROOT/RDF/RDefineBase.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RDefineBase.hxx @@ -16,6 +16,7 @@ #include "ROOT/RDF/RSampleInfo.hxx" #include "ROOT/RDF/Utils.hxx" #include "ROOT/RVec.hxx" +#include #include #include @@ -40,7 +41,9 @@ class RDefineBase { protected: const std::string fName; ///< The name of the custom column const std::string fType; ///< The type of the custom column as a text string - std::vector fLastCheckedEntry; + std::vector fLastCheckedEntry; /// Starting entry of the last bulk processed, per slot + /// Which entries in the current bulk are valid, per slot + std::vector fMaskPerSlot; RDFInternal::RColumnRegister fColRegister; RLoopManager *fLoopManager; // non-owning pointer to the RLoopManager const ROOT::RDF::ColumnNames_t fColumnNames; @@ -63,7 +66,7 @@ public: std::string GetName() const; std::string GetTypeName() const; /// Update the value at the address returned by GetValuePtr with the content corresponding to the given entry - virtual void Update(unsigned int slot, Long64_t entry) = 0; + virtual void Update(unsigned int slot, const ROOT::Internal::RDF::RMaskedEntryRange &mask) = 0; /// Update function to be called once per sample, used if the derived type is a RDefinePerSample virtual void Update(unsigned int /*slot*/, const ROOT::RDF::RSampleInfo &/*id*/) {} /// Clean-up operations to be performed at the end of a task. diff --git a/tree/dataframe/inc/ROOT/RDF/RDefinePerSample.hxx b/tree/dataframe/inc/ROOT/RDF/RDefinePerSample.hxx index 0302113345774..1b1a432fec2d7 100644 --- a/tree/dataframe/inc/ROOT/RDF/RDefinePerSample.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RDefinePerSample.hxx @@ -30,22 +30,25 @@ template class R__CLING_PTRCHECK(off) RDefinePerSample final : public RDefineBase { using RetType_t = typename CallableTraits::ret_type; - // Avoid instantiating vector as `operator[]` returns temporaries in that case. Use std::deque instead. - using ValuesPerSlot_t = - std::conditional_t::value, std::deque, std::vector>; + using ValuesPerSlot_t = std::vector>; F fExpression; - ValuesPerSlot_t fLastResults; + // Each slot accesses a cache of values for the current bulk + ValuesPerSlot_t fCachedResultsPerSlot; public: RDefinePerSample(std::string_view name, std::string_view type, F expression, RLoopManager &lm) : RDefineBase(name, type, RDFInternal::RColumnRegister{&lm}, lm, /*columnNames*/ {}), fExpression(std::move(expression)), - fLastResults(lm.GetNSlots() * RDFInternal::CacheLineStep()) + fCachedResultsPerSlot(lm.GetNSlots() * RDFInternal::CacheLineStep>()) { fLoopManager->Register(this); auto callUpdate = [this](unsigned int slot, const ROOT::RDF::RSampleInfo &id) { this->Update(slot, id); }; fLoopManager->AddSampleCallback(this, std::move(callUpdate)); + // Assume 1-size bulk for now + for (decltype(lm.GetNSlots()) i = 0; i < lm.GetNSlots(); ++i) { + fCachedResultsPerSlot[i * RDFInternal::CacheLineStep>()].resize(1ul); + } } RDefinePerSample(const RDefinePerSample &) = delete; @@ -53,13 +56,14 @@ public: ~RDefinePerSample() { fLoopManager->Deregister(this); } - /// Return the (type-erased) address of the Define'd value for the given processing slot. + /// Return the beginning of the cached results of the current bulk for the input processing slot void *GetValuePtr(unsigned int slot) final { - return static_cast(&fLastResults[slot * RDFInternal::CacheLineStep()]); + return static_cast( + fCachedResultsPerSlot[slot * RDFInternal::CacheLineStep>()].data()); } - void Update(unsigned int, Long64_t) final + void Update(unsigned int, const ROOT::Internal::RDF::RMaskedEntryRange &) final { // no-op } @@ -67,7 +71,8 @@ public: /// Update the value at the address returned by GetValuePtr with the content corresponding to the given entry void Update(unsigned int slot, const ROOT::RDF::RSampleInfo &id) final { - fLastResults[slot * RDFInternal::CacheLineStep()] = fExpression(slot, id); + // Assume 1-size bulk for now + fCachedResultsPerSlot[slot * RDFInternal::CacheLineStep>()][0] = fExpression(slot, id); } const std::type_info &GetTypeId() const final { return typeid(RetType_t); } diff --git a/tree/dataframe/inc/ROOT/RDF/RDefineReader.hxx b/tree/dataframe/inc/ROOT/RDF/RDefineReader.hxx index 754d4cdfcb5fe..2b10460e0a725 100644 --- a/tree/dataframe/inc/ROOT/RDF/RDefineReader.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RDefineReader.hxx @@ -42,11 +42,9 @@ class R__CLING_PTRCHECK(off) RDefineReader final : public ROOT::Detail::RDF::RCo /// The slot this value belongs to. unsigned int fSlot = std::numeric_limits::max(); - void *GetImpl(Long64_t entry) final - { - fDefine.Update(fSlot, entry); - return fValuePtr; - } + void *GetImpl(std::size_t /*idx*/) final { return fValuePtr; } + + void LoadImpl(const ROOT::Internal::RDF::RMaskedEntryRange &mask) final { fDefine.Update(fSlot, mask); } public: RDefineReader(unsigned int slot, RDFDetail::RDefineBase &define) diff --git a/tree/dataframe/inc/ROOT/RDF/RFilter.hxx b/tree/dataframe/inc/ROOT/RDF/RFilter.hxx index a884fa6b631ba..6f4762b7073b3 100644 --- a/tree/dataframe/inc/ROOT/RDF/RFilter.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RFilter.hxx @@ -93,45 +93,56 @@ public: fLoopManager->Deregister(this); } - bool CheckFilters(unsigned int slot, Long64_t entry) final + ROOT::Internal::RDF::RMaskedEntryRange + CheckFilters(unsigned int slot, Long64_t bulkBeginEntry, std::size_t bulkSize) final { - if (entry != fLastCheckedEntry[slot * RDFInternal::CacheLineStep()]) { - if (!fPrevNode.CheckFilters(slot, entry)) { - // a filter upstream returned false, cache the result - fLastResult[slot * RDFInternal::CacheLineStep()] = false; - } else { - // evaluate this filter, cache the result - auto passed = CheckFilterHelper(slot, entry, ColumnTypes_t{}, TypeInd_t{}); - passed ? ++fAccepted[slot * RDFInternal::CacheLineStep()] - : ++fRejected[slot * RDFInternal::CacheLineStep()]; - fLastResult[slot * RDFInternal::CacheLineStep()] = passed; + auto &cachedResults = fCachedResults[slot * RDFInternal::CacheLineStep>()]; + if (bulkBeginEntry == fLastCheckedEntry[slot * ROOT::Internal::RDF::CacheLineStep()]) + return {cachedResults, static_cast(bulkBeginEntry)}; + + auto mask = fPrevNode.CheckFilters(slot, bulkBeginEntry, bulkSize); + std::for_each(fValues[slot].begin(), fValues[slot].end(), [&mask](auto *v) { v->Load(mask); }); + + std::size_t accepted{0}; + std::size_t rejected{0}; + cachedResults.clear(); + cachedResults.resize(bulkSize); + for (std::size_t i = 0; i < bulkSize; ++i) { + if (mask[i]) { + cachedResults[i] = CheckFilterHelper(slot, i, ColumnTypes_t{}, TypeInd_t{}); + if (cachedResults[i]) + ++accepted; + else + ++rejected; } - fLastCheckedEntry[slot * RDFInternal::CacheLineStep()] = entry; } - return fLastResult[slot * RDFInternal::CacheLineStep()]; + fLastCheckedEntry[slot * ROOT::Internal::RDF::CacheLineStep()] = bulkBeginEntry; + fAccepted[slot * RDFInternal::CacheLineStep()] += accepted; + fRejected[slot * RDFInternal::CacheLineStep()] += rejected; + + return {cachedResults, static_cast(bulkBeginEntry)}; + } + + template + bool CheckFilterHelper(unsigned int slot, std::size_t idx, TypeList, std::index_sequence) + { + return fFilter(GetValueChecked(slot, S, idx)...); + (void)slot; // avoid unused parameter warning + (void)idx; // avoid unused parameter warning } template - auto GetValueChecked(unsigned int slot, std::size_t readerIdx, Long64_t entry) -> ColType & + auto GetValueChecked(unsigned int slot, std::size_t readerIdx, std::size_t idx) -> ColType & { - if (auto *val = fValues[slot][readerIdx]->template TryGet(entry)) + if (auto *val = fValues[slot][readerIdx]->template TryGet(idx)) return *val; throw std::out_of_range{"RDataFrame: Filter could not retrieve value for column '" + fColumnNames[readerIdx] + - "' for entry " + std::to_string(entry) + + "' for entry " + std::to_string(idx) + ". You can use the DefaultValueFor operation to provide a default value, or " "FilterAvailable/FilterMissing to discard/keep entries with missing values instead."}; } - template - bool CheckFilterHelper(unsigned int slot, Long64_t entry, TypeList, std::index_sequence) - { - return fFilter(GetValueChecked(slot, S, entry)...); - // avoid unused parameter warnings (gcc 12.1) - (void)slot; - (void)entry; - } - void InitSlot(TTreeReader *r, unsigned int slot) final { RDFInternal::RColumnReadersInfo info{fColumnNames, fColRegister, fIsDefine.data(), *fLoopManager}; diff --git a/tree/dataframe/inc/ROOT/RDF/RFilterBase.hxx b/tree/dataframe/inc/ROOT/RDF/RFilterBase.hxx index dffbb6a46651a..72650e599f9d5 100644 --- a/tree/dataframe/inc/ROOT/RDF/RFilterBase.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RFilterBase.hxx @@ -37,8 +37,6 @@ class RLoopManager; class RFilterBase : public RNodeBase { protected: - std::vector fLastCheckedEntry; - std::vector fLastResult = {true}; // std::vector cannot be used in a MT context safely std::vector fAccepted = {0}; std::vector fRejected = {0}; const std::string fName; @@ -49,6 +47,8 @@ protected: std::string fVariation; ///< This indicates for what variation this filter evaluates values. std::unordered_map> fVariedFilters; + std::vector> fCachedResults{}; // Vector of cached results, per slot + public: RFilterBase(RLoopManager *df, std::string_view name, const unsigned int nSlots, const RDFInternal::RColumnRegister &colRegister, const ColumnNames_t &columns, diff --git a/tree/dataframe/inc/ROOT/RDF/RFilterWithMissingValues.hxx b/tree/dataframe/inc/ROOT/RDF/RFilterWithMissingValues.hxx index 614513a5add6c..e3e518e7e5f46 100644 --- a/tree/dataframe/inc/ROOT/RDF/RFilterWithMissingValues.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RFilterWithMissingValues.hxx @@ -96,30 +96,39 @@ public: fLoopManager->EraseSuppressErrorsForMissingBranch(fColumnNames[0]); } - bool CheckFilters(unsigned int slot, Long64_t entry) final + ROOT::Internal::RDF::RMaskedEntryRange + CheckFilters(unsigned int slot, Long64_t bulkBeginEntry, std::size_t bulkSize) final { constexpr static auto cacheLineStepLong64_t = RDFInternal::CacheLineStep(); - constexpr static auto cacheLineStepint = RDFInternal::CacheLineStep(); constexpr static auto cacheLineStepULong64_t = RDFInternal::CacheLineStep(); - if (entry != fLastCheckedEntry[slot * cacheLineStepLong64_t]) { - if (!fPrevNodePtr->CheckFilters(slot, entry)) { - // a filter upstream returned false, cache the result - fLastResult[slot * cacheLineStepint] = false; + if (bulkBeginEntry == fLastCheckedEntry[slot * cacheLineStepLong64_t]) + return {fCachedResults[slot], static_cast(bulkBeginEntry)}; + + auto mask = fPrevNodePtr->CheckFilters(slot, bulkBeginEntry, bulkSize); + + fValues[slot]->Load(mask); + + std::size_t accepted{0}; + std::size_t rejected{0}; + fCachedResults[slot].clear(); + fCachedResults[slot].resize(bulkSize); + for (std::size_t i = 0; i < bulkSize; i++) { + + const bool valueIsMissing = fValues[slot]->template TryGet(i) == nullptr; + if (fDiscardEntryWithMissingValue) { + valueIsMissing ? ++rejected : ++accepted; + fCachedResults[slot][i] = mask[i] && !valueIsMissing; } else { - // evaluate this filter, cache the result - const bool valueIsMissing = fValues[slot]->template TryGet(entry) == nullptr; - if (fDiscardEntryWithMissingValue) { - valueIsMissing ? ++fRejected[slot * cacheLineStepULong64_t] : ++fAccepted[slot * cacheLineStepULong64_t]; - fLastResult[slot * cacheLineStepint] = !valueIsMissing; - } else { - valueIsMissing ? ++fAccepted[slot * cacheLineStepULong64_t] : ++fRejected[slot * cacheLineStepULong64_t]; - fLastResult[slot * cacheLineStepint] = valueIsMissing; - } + valueIsMissing ? ++accepted : ++rejected; + fCachedResults[slot][i] = mask[i] && valueIsMissing; } - fLastCheckedEntry[slot * cacheLineStepLong64_t] = entry; } - return fLastResult[slot * cacheLineStepint]; + fAccepted[slot * cacheLineStepULong64_t] += accepted; + fRejected[slot * cacheLineStepULong64_t] += rejected; + fLastCheckedEntry[slot * cacheLineStepLong64_t] = bulkBeginEntry; + + return {fCachedResults[slot], static_cast(bulkBeginEntry)}; } void InitSlot(TTreeReader *r, unsigned int slot) final diff --git a/tree/dataframe/inc/ROOT/RDF/RJittedAction.hxx b/tree/dataframe/inc/ROOT/RDF/RJittedAction.hxx index 9d31bf25d2591..fbcc1ae9402d7 100644 --- a/tree/dataframe/inc/ROOT/RDF/RJittedAction.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RJittedAction.hxx @@ -50,7 +50,7 @@ public: void SetAction(std::unique_ptr a) { fConcreteAction = std::move(a); } - void Run(unsigned int slot, Long64_t entry) final; + void Run(unsigned int, Long64_t, std::size_t) final; void Initialize() final; void InitSlot(TTreeReader *r, unsigned int slot) final; void TriggerChildrenCount() final; diff --git a/tree/dataframe/inc/ROOT/RDF/RJittedDefine.hxx b/tree/dataframe/inc/ROOT/RDF/RJittedDefine.hxx index 2bc79170de8f3..4b470d5a761af 100644 --- a/tree/dataframe/inc/ROOT/RDF/RJittedDefine.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RJittedDefine.hxx @@ -58,7 +58,7 @@ public: void InitSlot(TTreeReader *r, unsigned int slot) final; void *GetValuePtr(unsigned int slot) final; const std::type_info &GetTypeId() const final; - void Update(unsigned int slot, Long64_t entry) final; + void Update(unsigned int slot, const ROOT::Internal::RDF::RMaskedEntryRange &mask) final; void Update(unsigned int slot, const ROOT::RDF::RSampleInfo &id) final; void FinalizeSlot(unsigned int slot) final; void MakeVariations(const std::vector &variations) final; diff --git a/tree/dataframe/inc/ROOT/RDF/RJittedFilter.hxx b/tree/dataframe/inc/ROOT/RDF/RJittedFilter.hxx index b01325ed2e68e..33802ba8d5999 100644 --- a/tree/dataframe/inc/ROOT/RDF/RJittedFilter.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RJittedFilter.hxx @@ -55,7 +55,7 @@ public: void SetFilter(std::unique_ptr f); void InitSlot(TTreeReader *r, unsigned int slot) final; - bool CheckFilters(unsigned int slot, Long64_t entry) final; + ROOT::Internal::RDF::RMaskedEntryRange CheckFilters(unsigned int, Long64_t, std::size_t) final; void Report(ROOT::RDF::RCutFlowReport &) const final; void PartialReport(ROOT::RDF::RCutFlowReport &) const final; void FillReport(ROOT::RDF::RCutFlowReport &) const final; diff --git a/tree/dataframe/inc/ROOT/RDF/RJittedVariation.hxx b/tree/dataframe/inc/ROOT/RDF/RJittedVariation.hxx index 0abd61df5109d..f6b4cec863861 100644 --- a/tree/dataframe/inc/ROOT/RDF/RJittedVariation.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RJittedVariation.hxx @@ -43,7 +43,7 @@ public: void InitSlot(TTreeReader *r, unsigned int slot) final; void *GetValuePtr(unsigned int slot, const std::string &column, const std::string &variation) final; const std::type_info &GetTypeId() const final; - void Update(unsigned int slot, Long64_t entry) final; + void Update(unsigned int slot, const ROOT::Internal::RDF::RMaskedEntryRange &mask) final; void FinalizeSlot(unsigned int slot) final; }; diff --git a/tree/dataframe/inc/ROOT/RDF/RLazyDSImpl.hxx b/tree/dataframe/inc/ROOT/RDF/RLazyDSImpl.hxx index 9f31d9d4a831e..e30d6ce016711 100644 --- a/tree/dataframe/inc/ROOT/RDF/RLazyDSImpl.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RLazyDSImpl.hxx @@ -27,7 +27,8 @@ namespace ROOT::Internal::RDF { class R__CLING_PTRCHECK(off) RLazyDSColumnReader final : public ROOT::Detail::RDF::RColumnReaderBase { ROOT::Internal::RDF::TPointerHolder *fPtr; - void *GetImpl(Long64_t) final { return fPtr->GetPointer(); } + void *GetImpl(std::size_t) final { return fPtr->GetPointer(); } + void LoadImpl(const ROOT::Internal::RDF::RMaskedEntryRange &) final {} public: RLazyDSColumnReader(ROOT::Internal::RDF::TPointerHolder *ptr) : fPtr(ptr) {} diff --git a/tree/dataframe/inc/ROOT/RDF/RLoopManager.hxx b/tree/dataframe/inc/ROOT/RDF/RLoopManager.hxx index 6b6af72f84331..b66a279866f25 100644 --- a/tree/dataframe/inc/ROOT/RDF/RLoopManager.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RLoopManager.hxx @@ -181,7 +181,7 @@ class RLoopManager : public RNodeBase { void RunEmptySource(); void RunDataSourceMT(); void RunDataSource(); - void RunAndCheckFilters(unsigned int slot, Long64_t entry); + void RunAndCheckFilters(unsigned int slot, Long64_t bulkBeginEntry, std::size_t bulkSize); void InitNodeSlots(TTreeReader *r, unsigned int slot); void InitNodes(); void CleanUpNodes(); @@ -224,6 +224,9 @@ class RLoopManager : public RNodeBase { std::vector fJitHelperCalls{}; std::hash fStringHasher{}; + // Assume 1-size bulk for now + std::size_t fCurrentBulkSize{1}; + public: RLoopManager(const ColumnNames_t &defaultColumns = {}); RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches); @@ -256,7 +259,7 @@ public: void Deregister(RDefineBase *definePtr); void Register(RDFInternal::RVariationBase *varPtr); void Deregister(RDFInternal::RVariationBase *varPtr); - bool CheckFilters(unsigned int, Long64_t) final; + ROOT::Internal::RDF::RMaskedEntryRange CheckFilters(unsigned int, Long64_t, std::size_t) final; unsigned int GetNSlots() const { return fNSlots; } void Report(ROOT::RDF::RCutFlowReport &rep) const final; /// End of recursive chain of calls, does nothing @@ -336,6 +339,8 @@ public: /// The task run by every thread on an entry range (known by the input TTreeReader), for the TTree data source. void TTreeThreadTask(TTreeReader &treeReader, ROOT::Internal::RSlotStack &slotStack, std::atomic &entryCount); + + std::size_t GetCurrentBulkSize() { return fCurrentBulkSize; } }; /// \brief Create an RLoopManager that reads a TChain. diff --git a/tree/dataframe/inc/ROOT/RDF/RMaskedEntryRange.hxx b/tree/dataframe/inc/ROOT/RDF/RMaskedEntryRange.hxx new file mode 100644 index 0000000000000..55da9b3caea18 --- /dev/null +++ b/tree/dataframe/inc/ROOT/RDF/RMaskedEntryRange.hxx @@ -0,0 +1,38 @@ +// Author: Enrico Guiraud, Vincenzo Eduardo Padulano + +/************************************************************************* + * Copyright (C) 1995-2026, Rene Brun and Fons Rademakers. * + * All rights reserved. * + * * + * For the licensing terms see $ROOTSYS/LICENSE. * + * For the list of contributors see $ROOTSYS/README/CREDITS. * + *************************************************************************/ + +#ifndef ROOT_RDF_RMASKEDENTRYRANGE +#define ROOT_RDF_RMASKEDENTRYRANGE + +#include + +#include +#include + +namespace ROOT::Internal::RDF { + +/// RDataFrame's internal representation of an entry range with a boolean mask. +/// The mask has static size but depending on the dynamic bulk size fewer elements could be in use: +/// do not take the size of the mask as the size of the bulk. +class RMaskedEntryRange { + ROOT::RVec fMask{}; ///< Boolean mask. Its size is set at construction time. + std::uint64_t fBegin{}; ///< Entry number of the first entry in the range this mask corresponds to. + +public: + RMaskedEntryRange(std::size_t size, bool set = true, std::uint64_t entry = 0) : fMask(size, set), fBegin(entry) {} + RMaskedEntryRange(const ROOT::RVec &mask, std::uint64_t begin) : fMask(mask), fBegin(begin) {} + const bool &operator[](std::size_t idx) const { return fMask.at(idx); } + bool &operator[](std::size_t idx) { return fMask.at(idx); } + std::uint64_t GetFirstEntry() const { return fBegin; } + void SetFirstEntry(std::uint64_t e) { fBegin = e; } +}; + +} // namespace ROOT::Internal::RDF +#endif diff --git a/tree/dataframe/inc/ROOT/RDF/RNodeBase.hxx b/tree/dataframe/inc/ROOT/RDF/RNodeBase.hxx index ce38725449333..7984e3fb6596a 100644 --- a/tree/dataframe/inc/ROOT/RDF/RNodeBase.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RNodeBase.hxx @@ -13,6 +13,8 @@ #include "RtypesCore.h" #include "TError.h" // R__ASSERT +#include +#include #include #include @@ -46,10 +48,13 @@ protected: unsigned int fNChildren{0}; ///< Number of nodes of the functional graph hanging from this object unsigned int fNStopsReceived{0}; ///< Number of times that a children node signaled to stop processing entries. std::vector fVariations; ///< List of systematic variations that affect this node. + std::vector fLastCheckedEntry; public: - RNodeBase(const std::vector &variations = {}, RLoopManager *lm = nullptr) - : fLoopManager(lm), fVariations(variations) + RNodeBase(const std::vector &variations = {}, RLoopManager *lm = nullptr, unsigned int nSlots = 1) + : fLoopManager(lm), + fVariations(variations), + fLastCheckedEntry(nSlots * ROOT::Internal::RDF::CacheLineStep(), -1) { } @@ -60,7 +65,6 @@ public: RNodeBase &operator=(RNodeBase &&) = delete; virtual ~RNodeBase() = default; - virtual bool CheckFilters(unsigned int, Long64_t) = 0; virtual void Report(ROOT::RDF::RCutFlowReport &) const = 0; virtual void PartialReport(ROOT::RDF::RCutFlowReport &) const = 0; virtual void IncrChildrenCount() = 0; @@ -87,6 +91,9 @@ public: "GetVariedFilter was called on a node type that does not implement it. This should never happen."); return nullptr; } + + virtual ROOT::Internal::RDF::RMaskedEntryRange + CheckFilters(unsigned int slot, Long64_t bulkBeginEntry, std::size_t bulkSize) = 0; }; } // ns RDF } // ns Detail diff --git a/tree/dataframe/inc/ROOT/RDF/RRange.hxx b/tree/dataframe/inc/ROOT/RDF/RRange.hxx index ea2133ec3d879..142604ba06433 100644 --- a/tree/dataframe/inc/ROOT/RDF/RRange.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RRange.hxx @@ -68,30 +68,36 @@ public: ~RRange() final { fLoopManager->Deregister(this); } /// Ranges act as filters when it comes to selecting entries that downstream nodes should process - bool CheckFilters(unsigned int slot, Long64_t entry) final + ROOT::Internal::RDF::RMaskedEntryRange + CheckFilters(unsigned int slot, Long64_t bulkBeginEntry, std::size_t bulkSize) final { - if (entry != fLastCheckedEntry) { - if (fHasStopped) - return false; - if (!fPrevNode.CheckFilters(slot, entry)) { - // a filter upstream returned false, cache the result - fLastResult = false; - } else { - // apply range filter logic, cache the result - if (fNProcessedEntries < fStart || (fStop > 0 && fNProcessedEntries >= fStop) || - (fStride != 1 && (fNProcessedEntries - fStart) % fStride != 0)) - fLastResult = false; - else - fLastResult = true; + if (bulkBeginEntry == fLastCheckedEntry[slot * ROOT::Internal::RDF::CacheLineStep()]) + return {fCachedResults[slot * RDFInternal::CacheLineStep>()], + static_cast(bulkBeginEntry)}; + + if (fHasStopped) + return {bulkSize, false, static_cast(bulkBeginEntry)}; + + fLastCheckedEntry[slot * ROOT::Internal::RDF::CacheLineStep()] = bulkBeginEntry; + auto mask = fPrevNode.CheckFilters(slot, bulkBeginEntry, bulkSize); + fCachedResults[slot * RDFInternal::CacheLineStep>()].clear(); + fCachedResults[slot * RDFInternal::CacheLineStep>()].resize(bulkSize); + for (std::size_t i = 0; i < bulkSize; ++i) { + if (mask[i]) { + fCachedResults[slot * RDFInternal::CacheLineStep>()][i] = + fNProcessedEntries >= fStart && (fStop == 0 || fNProcessedEntries < fStop) && + (fStride == 1 || (fNProcessedEntries - fStart) % fStride == 0); ++fNProcessedEntries; - if (fNProcessedEntries == fStop) { - fHasStopped = true; - fPrevNode.StopProcessing(); - } } - fLastCheckedEntry = entry; } - return fLastResult; + + if (fStop > 0 && fNProcessedEntries >= fStop) { + fHasStopped = true; + fPrevNode.StopProcessing(); + } + + return {fCachedResults[slot * RDFInternal::CacheLineStep>()], + static_cast(bulkBeginEntry)}; } // recursive chain of `Report`s diff --git a/tree/dataframe/inc/ROOT/RDF/RRangeBase.hxx b/tree/dataframe/inc/ROOT/RDF/RRangeBase.hxx index c5fbed6dc5d83..d50d32c69724f 100644 --- a/tree/dataframe/inc/ROOT/RDF/RRangeBase.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RRangeBase.hxx @@ -35,13 +35,13 @@ protected: unsigned int fStart; unsigned int fStop; unsigned int fStride; - Long64_t fLastCheckedEntry{-1}; - bool fLastResult{true}; ULong64_t fNProcessedEntries{0}; bool fHasStopped{false}; ///< True if the end of the range has been reached const unsigned int fNSlots; ///< Number of thread slots used by this node, inherited from parent node. std::unordered_map> fVariedRanges; + std::vector> fCachedResults; + public: RRangeBase(RLoopManager *implPtr, unsigned int start, unsigned int stop, unsigned int stride, const unsigned int nSlots, const std::vector &prevVariations); diff --git a/tree/dataframe/inc/ROOT/RDF/RTreeColumnReader.hxx b/tree/dataframe/inc/ROOT/RDF/RTreeColumnReader.hxx index b2dc37c8e85a1..b750a74f89bd3 100644 --- a/tree/dataframe/inc/ROOT/RDF/RTreeColumnReader.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RTreeColumnReader.hxx @@ -37,8 +37,10 @@ namespace RDF { class R__CLING_PTRCHECK(off) RTreeOpaqueColumnReader final : public ROOT::Detail::RDF::RColumnReaderBase { std::unique_ptr fTreeValue; + void *fValuePtr{nullptr}; - void *GetImpl(Long64_t) override; + void *GetImpl(std::size_t) override; + void LoadImpl(const ROOT::Internal::RDF::RMaskedEntryRange &) override; public: /// Construct the RTreeColumnReader. Actual initialization is performed lazily by the Init method. @@ -56,8 +58,10 @@ public: /// RTreeColumnReader specialization for TTree values read via TTreeReaderUntypedValue class R__CLING_PTRCHECK(off) RTreeUntypedValueColumnReader final : public ROOT::Detail::RDF::RColumnReaderBase { std::unique_ptr fTreeValue; + void *fValuePtr{nullptr}; - void *GetImpl(Long64_t) override; + void *GetImpl(std::size_t) override; + void LoadImpl(const ROOT::Internal::RDF::RMaskedEntryRange &) override; public: RTreeUntypedValueColumnReader(TTreeReader &r, std::string_view colName, std::string_view typeName); @@ -111,11 +115,14 @@ private: /// Whether we already printed a warning about performing a copy of the TTreeReaderArray contents bool fCopyWarningPrinted = false; - void *GetImpl(Long64_t entry) override; + void *fValuePtr{nullptr}; - void *ReadStdArray(Long64_t entry); - void *ReadStdVector(Long64_t entry); - void *ReadRVec(Long64_t entry); + void *GetImpl(std::size_t) override; + void LoadImpl(const ROOT::Internal::RDF::RMaskedEntryRange &) override; + + void *LoadStdArray(Long64_t entry); + void *LoadStdVector(Long64_t entry); + void *LoadRVec(Long64_t entry); }; class R__CLING_PTRCHECK(off) RMaskedColumnReader : public ROOT::Detail::RDF::RColumnReaderBase { @@ -123,7 +130,9 @@ class R__CLING_PTRCHECK(off) RMaskedColumnReader : public ROOT::Detail::RDF::RCo std::unique_ptr> fTreeValueMask; unsigned int fMaskIndex = 0; - void *GetImpl(Long64_t) override; + void *fValuePtr{nullptr}; + void *GetImpl(std::size_t) override; + void LoadImpl(const ROOT::Internal::RDF::RMaskedEntryRange &) override; public: RMaskedColumnReader(TTreeReader &r, std::unique_ptr valueReader, diff --git a/tree/dataframe/inc/ROOT/RDF/RVariation.hxx b/tree/dataframe/inc/ROOT/RDF/RVariation.hxx index 0a3bc702d7e9e..5cf18ab7709d9 100644 --- a/tree/dataframe/inc/ROOT/RDF/RVariation.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RVariation.hxx @@ -77,9 +77,9 @@ void AssignResults(ROOT::RVec &resStorage, ROOT::RVec &&tmpResults) } template -void *GetValuePtrHelper(ROOT::RVec &v, std::size_t /*colIdx*/, std::size_t varIdx) +void *GetValuePtrHelper(ROOT::RVec> &v, std::size_t /*colIdx*/, std::size_t varIdx) { - return static_cast(&v[varIdx]); + return static_cast(&v[0][varIdx]); } ///@} @@ -119,9 +119,9 @@ void AssignResults(std::vector> &resStorage, ROOT::RVec -void *GetValuePtrHelper(std::vector> &v, std::size_t colIdx, std::size_t varIdx) +void *GetValuePtrHelper(ROOT::RVec>> &v, std::size_t colIdx, std::size_t varIdx) { - return static_cast(&v[colIdx][varIdx]); + return static_cast(&v[0][colIdx][varIdx]); } ///@} @@ -151,32 +151,33 @@ class R__CLING_PTRCHECK(off) RVariation final : public RVariationBase { using Ret_t = typename CallableTraits::ret_type; using VariedCol_t = ColumnType_t; using Result_t = std::conditional_t, std::vector>>; + using ValuesPerSlot_t = std::vector>; F fExpression; - /// Per-slot storage for varied column values (for one or multiple columns depending on IsSingleColumn). - std::vector fLastResults; + // Each slot accesses a cache of values for the current bulk + ValuesPerSlot_t fCachedResultsPerSlot; /// Column readers per slot and per input column std::vector> fValues; template - auto GetValueChecked(unsigned int slot, std::size_t readerIdx, Long64_t entry) -> ColType & + auto GetValueChecked(unsigned int slot, std::size_t readerIdx, std::size_t idx) -> ColType & { - if (auto *val = fValues[slot][readerIdx]->template TryGet(entry)) + if (auto *val = fValues[slot][readerIdx]->template TryGet(idx)) return *val; throw std::out_of_range{"RDataFrame: Could not retrieve value for variation '" + fColNames[readerIdx] + - "' for entry " + std::to_string(entry) + + "' for entry " + std::to_string(idx) + ". You can use the DefaultValueFor operation to provide a default value, or " "FilterAvailable/FilterMissing to discard/keep entries with missing values instead."}; } template - void UpdateHelper(unsigned int slot, Long64_t entry, TypeList, std::index_sequence) + void UpdateHelper(unsigned int slot, std::size_t idx, TypeList, std::index_sequence) { // fExpression must return an RVec - auto &&results = fExpression(GetValueChecked(slot, S, entry)...); - (void)entry; // avoid unused parameter warnings (gcc 12.1) + auto &&results = fExpression(GetValueChecked(slot, S, idx)...); + (void)idx; // avoid unused parameter warnings (gcc 12.1) if (!ResultsSizeEq(results, fVariationNames.size(), fColNames.size(), std::integral_constant{})) { @@ -186,7 +187,8 @@ class R__CLING_PTRCHECK(off) RVariation final : public RVariationBase { std::to_string(fVariationNames.size()) + " were expected."); } - AssignResults(fLastResults[slot * CacheLineStep()], std::move(results)); + AssignResults(fCachedResultsPerSlot[slot * RDFInternal::CacheLineStep>()][0], + std::move(results)); } public: @@ -195,13 +197,17 @@ public: RLoopManager &lm, const ColumnNames_t &inputColNames) : RVariationBase(colNames, variationName, variationTags, type, defines, lm, inputColNames), fExpression(std::move(expression)), - fLastResults(lm.GetNSlots() * CacheLineStep()), + fCachedResultsPerSlot(lm.GetNSlots() * RDFInternal::CacheLineStep>()), fValues(lm.GetNSlots()) { fLoopManager->Register(this); - for (auto i = 0u; i < lm.GetNSlots(); ++i) - ResizeResults(fLastResults[i * CacheLineStep()], colNames.size(), variationTags.size()); + // Assume 1-size bulk for now + for (decltype(lm.GetNSlots()) i = 0; i < lm.GetNSlots(); ++i) { + auto &cachedResultsForThisSlot = fCachedResultsPerSlot[i * RDFInternal::CacheLineStep>()]; + cachedResultsForThisSlot.resize(1ul); + ResizeResults(cachedResultsForThisSlot[0], colNames.size(), variationTags.size()); + } } RVariation(const RVariation &) = delete; @@ -215,7 +221,8 @@ public: fLastCheckedEntry[slot * CacheLineStep()] = -1; } - /// Return the (type-erased) address of the value for the given processing slot. + /// Return the beginning of the cached results of the current bulk for the input processing slot, column and + /// variation void *GetValuePtr(unsigned int slot, const std::string &column, const std::string &variation) final { const auto colIt = std::find(fColNames.begin(), fColNames.end(), column); @@ -226,16 +233,24 @@ public: assert(varIt != fVariationNames.end()); const auto varIdx = std::distance(fVariationNames.begin(), varIt); - return GetValuePtrHelper(fLastResults[slot * CacheLineStep()], colIdx, varIdx); + return GetValuePtrHelper(fCachedResultsPerSlot[slot * RDFInternal::CacheLineStep>()], colIdx, + varIdx); } /// Update the value at the address returned by GetValuePtr with the content corresponding to the given entry - void Update(unsigned int slot, Long64_t entry) final + void Update(unsigned int slot, const ROOT::Internal::RDF::RMaskedEntryRange &mask) final { - if (entry != fLastCheckedEntry[slot * CacheLineStep()]) { - // evaluate this filter, cache the result - UpdateHelper(slot, entry, ColumnTypes_t{}, TypeInd_t{}); - fLastCheckedEntry[slot * CacheLineStep()] = entry; + if (static_cast(mask.GetFirstEntry()) == fLastCheckedEntry[slot * CacheLineStep()]) + return; + + std::for_each(fValues[slot].begin(), fValues[slot].end(), [&mask](auto *v) { v->Load(mask); }); + // Assume 1-size bulk for now + const std::size_t bulkSize = 1; + for (std::size_t i = 0; i < bulkSize; ++i) { + if (mask[i]) { + UpdateHelper(slot, i, ColumnTypes_t{}, TypeInd_t{}); + fLastCheckedEntry[slot * CacheLineStep()] = mask.GetFirstEntry(); + } } } diff --git a/tree/dataframe/inc/ROOT/RDF/RVariationBase.hxx b/tree/dataframe/inc/ROOT/RDF/RVariationBase.hxx index 34b54a9b0a245..e44e971680ed1 100644 --- a/tree/dataframe/inc/ROOT/RDF/RVariationBase.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RVariationBase.hxx @@ -14,6 +14,7 @@ #include #include // ColumnNames_t #include +#include #include #include @@ -70,7 +71,7 @@ public: const std::string &GetVariationName() const; std::string GetTypeName() const; /// Update the value at the address returned by GetValuePtr with the content corresponding to the given entry - virtual void Update(unsigned int slot, Long64_t entry) = 0; + virtual void Update(unsigned int slot, const ROOT::Internal::RDF::RMaskedEntryRange &mask) = 0; /// Clean-up operations to be performed at the end of a task. virtual void FinalizeSlot(unsigned int slot) = 0; }; diff --git a/tree/dataframe/inc/ROOT/RDF/RVariationReader.hxx b/tree/dataframe/inc/ROOT/RDF/RVariationReader.hxx index a9e6dfacaafd6..39661a5a98af8 100644 --- a/tree/dataframe/inc/ROOT/RDF/RVariationReader.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RVariationReader.hxx @@ -32,11 +32,9 @@ class R__CLING_PTRCHECK(off) RVariationReader final : public ROOT::Detail::RDF:: /// The slot this value belongs to. unsigned int fSlot = std::numeric_limits::max(); - void *GetImpl(Long64_t entry) final - { - fVariation->Update(fSlot, entry); - return fValuePtr; - } + void *GetImpl(std::size_t /*idx*/) final { return fValuePtr; } + + void LoadImpl(const ROOT::Internal::RDF::RMaskedEntryRange &mask) final { fVariation->Update(fSlot, mask); } public: RVariationReader(unsigned int slot, const std::string &colName, const std::string &variationName, diff --git a/tree/dataframe/inc/ROOT/RDF/RVariedAction.hxx b/tree/dataframe/inc/ROOT/RDF/RVariedAction.hxx index a14ec6df9ac3b..899bd2f10f4e1 100644 --- a/tree/dataframe/inc/ROOT/RDF/RVariedAction.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RVariedAction.hxx @@ -141,31 +141,36 @@ public: } template - auto GetValueChecked(unsigned int slot, unsigned int varIdx, std::size_t readerIdx, Long64_t entry) -> ColType & + auto GetValueChecked(unsigned int slot, unsigned int varIdx, std::size_t readerIdx, std::size_t idx) -> ColType & { - if (auto *val = fInputValues[slot][varIdx][readerIdx]->template TryGet(entry)) + if (auto *val = fInputValues[slot][varIdx][readerIdx]->template TryGet(idx)) return *val; throw std::out_of_range{"RDataFrame: Varied action (" + fHelpers[0].GetActionName() + ") could not retrieve value for column '" + fColumnNames[readerIdx] + "' for entry " + - std::to_string(entry) + + std::to_string(idx) + ". You can use the DefaultValueFor operation to provide a default value, or " "FilterAvailable/FilterMissing to discard/keep entries with missing values instead."}; } template - void CallExec(unsigned int slot, unsigned int varIdx, Long64_t entry, TypeList, + void CallExec(unsigned int slot, unsigned int varIdx, std::size_t idx, TypeList, std::index_sequence) { - fHelpers[varIdx].Exec(slot, GetValueChecked(slot, varIdx, ReaderIdxs, entry)...); - (void)entry; + fHelpers[varIdx].Exec(slot, GetValueChecked(slot, varIdx, ReaderIdxs, idx)...); + (void)idx; } - void Run(unsigned int slot, Long64_t entry) final + void Run(unsigned int slot, Long64_t bulkBeginEntry, std::size_t bulkSize) final { for (auto varIdx = 0u; varIdx < GetVariations().size(); ++varIdx) { - if (fPrevNodes[varIdx]->CheckFilters(slot, entry)) - CallExec(slot, varIdx, entry, ColumnTypes_t{}, TypeInd_t{}); + const auto mask = fPrevNodes[varIdx]->CheckFilters(slot, bulkBeginEntry, bulkSize); + std::for_each(fInputValues[slot][varIdx].begin(), fInputValues[slot][varIdx].end(), + [&mask](auto *v) { v->Load(mask); }); + + // Assume 1-size bulk for now + if (mask[0]) + CallExec(slot, varIdx, /*idx=*/0u, ColumnTypes_t{}, TypeInd_t{}); } } diff --git a/tree/dataframe/inc/ROOT/RDF/SnapshotHelpers.hxx b/tree/dataframe/inc/ROOT/RDF/SnapshotHelpers.hxx index 698d58f697a85..fdaa4d3512b16 100644 --- a/tree/dataframe/inc/ROOT/RDF/SnapshotHelpers.hxx +++ b/tree/dataframe/inc/ROOT/RDF/SnapshotHelpers.hxx @@ -299,7 +299,8 @@ public: void InitTask(TTreeReader *, unsigned int slot); - void Exec(unsigned int /*slot*/, const std::vector &values, std::vector const &filterPassed); + void Exec(unsigned int /*slot*/, const std::vector &values, + std::vector const &filterPassed); /// Nothing to do. All initialisations run in the constructor or InitTask(). void Initialize() {} diff --git a/tree/dataframe/inc/ROOT/RSqliteDS.hxx b/tree/dataframe/inc/ROOT/RSqliteDS.hxx index 5f46a5139dbda..0452e43bef097 100644 --- a/tree/dataframe/inc/ROOT/RSqliteDS.hxx +++ b/tree/dataframe/inc/ROOT/RSqliteDS.hxx @@ -22,7 +22,8 @@ namespace ROOT::Internal::RDF { class R__CLING_PTRCHECK(off) RSqliteDSColumnReader final : public ROOT::Detail::RDF::RColumnReaderBase { void *fValuePtr; - void *GetImpl(Long64_t) final { return fValuePtr; } + void *GetImpl(std::size_t) final { return fValuePtr; } + void LoadImpl(const ROOT::Internal::RDF::RMaskedEntryRange &) final {} public: RSqliteDSColumnReader(void *valuePtr) : fValuePtr(valuePtr) {} diff --git a/tree/dataframe/inc/ROOT/RTrivialDS.hxx b/tree/dataframe/inc/ROOT/RTrivialDS.hxx index d7b7de89da425..340c738f1aed0 100644 --- a/tree/dataframe/inc/ROOT/RTrivialDS.hxx +++ b/tree/dataframe/inc/ROOT/RTrivialDS.hxx @@ -17,7 +17,8 @@ namespace ROOT::Internal::RDF { class R__CLING_PTRCHECK(off) RTrivialDSColumnReader final : public ROOT::Detail::RDF::RColumnReaderBase { ULong64_t *fValuePtr; - void *GetImpl(Long64_t) final { return fValuePtr; } + void *GetImpl(std::size_t) final { return fValuePtr; } + void LoadImpl(const ROOT::Internal::RDF::RMaskedEntryRange &) final {} public: RTrivialDSColumnReader(ULong64_t *valuePtr) : fValuePtr(valuePtr) {} diff --git a/tree/dataframe/inc/ROOT/RVecDS.hxx b/tree/dataframe/inc/ROOT/RVecDS.hxx index 18f36f6942cd9..7bda21c71ad95 100644 --- a/tree/dataframe/inc/ROOT/RVecDS.hxx +++ b/tree/dataframe/inc/ROOT/RVecDS.hxx @@ -30,7 +30,8 @@ namespace ROOT::Internal::RDF { class R__CLING_PTRCHECK(off) RVecDSColumnReader final : public ROOT::Detail::RDF::RColumnReaderBase { TPointerHolder *fPtrHolder; - void *GetImpl(Long64_t) final { return fPtrHolder->GetPointer(); } + void *GetImpl(std::size_t) final { return fPtrHolder->GetPointer(); } + void LoadImpl(const ROOT::Internal::RDF::RMaskedEntryRange &) final {} public: RVecDSColumnReader(TPointerHolder *ptrHolder) : fPtrHolder(ptrHolder) {} diff --git a/tree/dataframe/src/RDFSnapshotHelpers.cxx b/tree/dataframe/src/RDFSnapshotHelpers.cxx index 3764a447627ca..70ddb08191598 100644 --- a/tree/dataframe/src/RDFSnapshotHelpers.cxx +++ b/tree/dataframe/src/RDFSnapshotHelpers.cxx @@ -1222,9 +1222,11 @@ void ROOT::Internal::RDF::SnapshotHelperWithVariations::InitTask(TTreeReader *, /// Connect all output fields to the values pointed to by `values`, fill the output dataset, /// call the Fill of the output tree, and clear the mask bits that show whether a variation was reached. -void ROOT::Internal::RDF::SnapshotHelperWithVariations::Exec(unsigned int /*slot*/, const std::vector &values, - std::vector const &filterPassed) +void ROOT::Internal::RDF::SnapshotHelperWithVariations::Exec( + unsigned int /*slot*/, const std::vector &values, + std::vector const &filterPassed) { + // Assume 1-size bulk for now // Rebind branch pointers to RDF values assert(fBranchData.size() == values.size()); for (std::size_t i = 0; i < values.size(); i++) { @@ -1234,14 +1236,14 @@ void ROOT::Internal::RDF::SnapshotHelperWithVariations::Exec(unsigned int /*slot SetBranchesHelper(fInputTree, *fOutputHandle->fTree, fBranchData, i, fOptions.fBasketSize, values[i]); } else { // Nominal will always be written, systematics only if needed - if (variationIndex == 0 || filterPassed[variationIndex]) { + if (variationIndex == 0 || filterPassed[variationIndex][0]) { const bool fundamentalType = fBranchData[i].WriteValueIfFundamental(values[i]); if (!fundamentalType) { SetBranchesHelper(fInputTree, *fOutputHandle->fTree, fBranchData, i, fOptions.fBasketSize, values[i]); } } - if (filterPassed[variationIndex]) { + if (filterPassed[variationIndex][0]) { fOutputHandle->SetMaskBit(variationIndex); } } diff --git a/tree/dataframe/src/RFilterBase.cxx b/tree/dataframe/src/RFilterBase.cxx index bf42e8199925d..248590cbfc462 100644 --- a/tree/dataframe/src/RFilterBase.cxx +++ b/tree/dataframe/src/RFilterBase.cxx @@ -19,12 +19,15 @@ using namespace ROOT::Detail::RDF; RFilterBase::RFilterBase(RLoopManager *implPtr, std::string_view name, const unsigned int nSlots, const RDFInternal::RColumnRegister &colRegister, const ColumnNames_t &columns, const std::vector &prevVariations, const std::string &variation) - : RNodeBase(ROOT::Internal::RDF::Union(colRegister.GetVariationDeps(columns), prevVariations), implPtr), - fLastCheckedEntry(nSlots * RDFInternal::CacheLineStep(), -1), - fLastResult(nSlots * RDFInternal::CacheLineStep()), + : RNodeBase(ROOT::Internal::RDF::Union(colRegister.GetVariationDeps(columns), prevVariations), implPtr, nSlots), fAccepted(nSlots * RDFInternal::CacheLineStep()), - fRejected(nSlots * RDFInternal::CacheLineStep()), fName(name), fColumnNames(columns), - fColRegister(colRegister), fIsDefine(columns.size()), fVariation(variation) + fRejected(nSlots * RDFInternal::CacheLineStep()), + fName(name), + fColumnNames(columns), + fColRegister(colRegister), + fIsDefine(columns.size()), + fVariation(variation), + fCachedResults(nSlots * RDFInternal::CacheLineStep>()) { const auto nColumns = fColumnNames.size(); for (auto i = 0u; i < nColumns; ++i) { diff --git a/tree/dataframe/src/RJittedAction.cxx b/tree/dataframe/src/RJittedAction.cxx index 65b6f392b6360..2d95881b50dc9 100644 --- a/tree/dataframe/src/RJittedAction.cxx +++ b/tree/dataframe/src/RJittedAction.cxx @@ -68,10 +68,10 @@ RJittedAction::RJittedAction(RLoopManager &lm, const ROOT::RDF::ColumnNames_t &c RJittedAction::~RJittedAction() {} -void RJittedAction::Run(unsigned int slot, Long64_t entry) +void RJittedAction::Run(unsigned int slot, Long64_t bulkBeginEntry, std::size_t bulkSize) { assert(fConcreteAction != nullptr); - fConcreteAction->Run(slot, entry); + fConcreteAction->Run(slot, bulkBeginEntry, bulkSize); } void RJittedAction::Initialize() diff --git a/tree/dataframe/src/RJittedDefine.cxx b/tree/dataframe/src/RJittedDefine.cxx index 0ead1a3b0ce1d..e10dd0b47f852 100644 --- a/tree/dataframe/src/RJittedDefine.cxx +++ b/tree/dataframe/src/RJittedDefine.cxx @@ -39,10 +39,10 @@ const std::type_info &RJittedDefine::GetTypeId() const "retrieved. This should never happen, please report this as a bug."); } -void RJittedDefine::Update(unsigned int slot, Long64_t entry) +void RJittedDefine::Update(unsigned int slot, const ROOT::Internal::RDF::RMaskedEntryRange &mask) { assert(fConcreteDefine != nullptr); - fConcreteDefine->Update(slot, entry); + fConcreteDefine->Update(slot, mask); } void RJittedDefine::Update(unsigned int slot, const ROOT::RDF::RSampleInfo &id) diff --git a/tree/dataframe/src/RJittedFilter.cxx b/tree/dataframe/src/RJittedFilter.cxx index 9e94e74e1c414..25c33a26efd90 100644 --- a/tree/dataframe/src/RJittedFilter.cxx +++ b/tree/dataframe/src/RJittedFilter.cxx @@ -55,10 +55,11 @@ void RJittedFilter::InitSlot(TTreeReader *r, unsigned int slot) fConcreteFilter->InitSlot(r, slot); } -bool RJittedFilter::CheckFilters(unsigned int slot, Long64_t entry) +ROOT::Internal::RDF::RMaskedEntryRange +RJittedFilter::CheckFilters(unsigned int slot, Long64_t bulkBeginEntry, std::size_t bulkSize) { assert(fConcreteFilter != nullptr); - return fConcreteFilter->CheckFilters(slot, entry); + return fConcreteFilter->CheckFilters(slot, bulkBeginEntry, bulkSize); } void RJittedFilter::Report(ROOT::RDF::RCutFlowReport &cr) const diff --git a/tree/dataframe/src/RJittedVariation.cxx b/tree/dataframe/src/RJittedVariation.cxx index 50f43c7db64ff..eb0257f474ccd 100644 --- a/tree/dataframe/src/RJittedVariation.cxx +++ b/tree/dataframe/src/RJittedVariation.cxx @@ -34,10 +34,10 @@ const std::type_info &RJittedVariation::GetTypeId() const return fConcreteVariation->GetTypeId(); } -void RJittedVariation::Update(unsigned int slot, Long64_t entry) +void RJittedVariation::Update(unsigned int slot, const ROOT::Internal::RDF::RMaskedEntryRange &mask) { assert(fConcreteVariation != nullptr); - fConcreteVariation->Update(slot, entry); + fConcreteVariation->Update(slot, mask); } void RJittedVariation::FinalizeSlot(unsigned int slot) diff --git a/tree/dataframe/src/RLoopManager.cxx b/tree/dataframe/src/RLoopManager.cxx index 67e1e9e4157b4..55d69612d2fb2 100644 --- a/tree/dataframe/src/RLoopManager.cxx +++ b/tree/dataframe/src/RLoopManager.cxx @@ -535,8 +535,12 @@ void RLoopManager::RunEmptySourceMT() R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({"an empty source", range.first, range.second, slot}); try { UpdateSampleInfo(slot, range); - for (auto currEntry = range.first; currEntry < range.second; ++currEntry) { - RunAndCheckFilters(slot, currEntry); + auto bulkBeginEntry = range.first; + std::size_t bulkSize = std::min(GetCurrentBulkSize(), static_cast(range.second - bulkBeginEntry)); + while (bulkBeginEntry < range.second) { + RunAndCheckFilters(slot, bulkBeginEntry, bulkSize); + bulkBeginEntry += bulkSize; + bulkSize = std::min(GetCurrentBulkSize(), static_cast(range.second - bulkBeginEntry)); } } catch (...) { // Error might throw in experiment frameworks like CMSSW @@ -560,9 +564,13 @@ void RLoopManager::RunEmptySource() RCallCleanUpTask cleanup(*this); try { UpdateSampleInfo(/*slot*/ 0, fEmptyEntryRange); - for (ULong64_t currEntry = fEmptyEntryRange.first; - currEntry < fEmptyEntryRange.second && fNStopsReceived < fNChildren; ++currEntry) { - RunAndCheckFilters(0, currEntry); + auto bulkBeginEntry = fEmptyEntryRange.first; + std::size_t bulkSize = + std::min(GetCurrentBulkSize(), static_cast(fEmptyEntryRange.second - bulkBeginEntry)); + while (bulkBeginEntry < fEmptyEntryRange.second) { + RunAndCheckFilters(0, bulkBeginEntry, bulkSize); + bulkBeginEntry += bulkSize; + bulkSize = std::min(GetCurrentBulkSize(), static_cast(fEmptyEntryRange.second - bulkBeginEntry)); } } catch (...) { std::cerr << "RDataFrame::Run: event loop was interrupted\n"; @@ -656,11 +664,15 @@ void RLoopManager::RunDataSource() const auto start = range.first; const auto end = range.second; R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, 0u}); - for (auto entry = start; entry < end && fNStopsReceived < fNChildren; ++entry) { - if (fDataSource->SetEntry(0u, entry)) { - RunAndCheckFilters(0u, entry); + auto bulkBeginEntry = start; + std::size_t bulkSize = std::min(GetCurrentBulkSize(), static_cast(end - bulkBeginEntry)); + while (bulkBeginEntry < end && fNStopsReceived < fNChildren) { + if (fDataSource->SetEntry(0u, bulkBeginEntry)) { + RunAndCheckFilters(0u, bulkBeginEntry, bulkSize); } - processedEntries++; + bulkBeginEntry += bulkSize; + processedEntries += bulkSize; + bulkSize = std::min(GetCurrentBulkSize(), static_cast(end - bulkBeginEntry)); } } } catch (...) { @@ -709,7 +721,7 @@ void RLoopManager::RunDataSourceMT() /// Execute actions and make sure named filters are called for each event. /// Named filters must be called even if the analysis logic would not require it, lest they report confusing results. -void RLoopManager::RunAndCheckFilters(unsigned int slot, Long64_t entry) +void RLoopManager::RunAndCheckFilters(unsigned int slot, Long64_t bulkBeginEntry, std::size_t bulkSize) { // data-block callbacks run before the rest of the graph if (fNewSampleNotifier.CheckFlag(slot)) { @@ -719,9 +731,9 @@ void RLoopManager::RunAndCheckFilters(unsigned int slot, Long64_t entry) } for (auto *actionPtr : fBookedActions) - actionPtr->Run(slot, entry); + actionPtr->Run(slot, bulkBeginEntry, bulkSize); for (auto *namedFilterPtr : fBookedNamedFilters) - namedFilterPtr->CheckFilters(slot, entry); + namedFilterPtr->CheckFilters(slot, bulkBeginEntry, bulkSize); for (auto &callback : fCallbacksEveryNEvents) callback(slot); } @@ -1058,9 +1070,10 @@ void RLoopManager::Deregister(RDFInternal::RVariationBase *v) } // dummy call, end of recursive chain of calls -bool RLoopManager::CheckFilters(unsigned int, Long64_t) +ROOT::Internal::RDF::RMaskedEntryRange +RLoopManager::CheckFilters(unsigned int, Long64_t bulkBeginEntry, std::size_t bulkSize) { - return true; + return ROOT::Internal::RDF::RMaskedEntryRange{bulkSize, true, static_cast(bulkBeginEntry)}; } /// Call `FillReport` on all booked filters @@ -1358,10 +1371,14 @@ void ROOT::Detail::RDF::RLoopManager::DataSourceThreadTask(const std::pairGetLabel(), start, end, slot}); try { - for (auto entry = start; entry < end; ++entry) { - if (fDataSource->SetEntry(slot, entry)) { - RunAndCheckFilters(slot, entry); + auto bulkBeginEntry = start; + std::size_t bulkSize = std::min(GetCurrentBulkSize(), static_cast(end - bulkBeginEntry)); + while (bulkBeginEntry < end) { + if (fDataSource->SetEntry(slot, bulkBeginEntry)) { + RunAndCheckFilters(slot, bulkBeginEntry, bulkSize); } + bulkBeginEntry += bulkSize; + bulkSize = std::min(GetCurrentBulkSize(), static_cast(end - bulkBeginEntry)); } } catch (...) { std::cerr << "RDataFrame::Run: event loop was interrupted\n"; @@ -1397,7 +1414,9 @@ void ROOT::Detail::RDF::RLoopManager::TTreeThreadTask(TTreeReader &treeReader, R if (fNewSampleNotifier.CheckFlag(slot)) { UpdateSampleInfo(slot, treeReader); } - RunAndCheckFilters(slot, count++); + std::size_t curBulkSize = std::min(GetCurrentBulkSize(), static_cast(end - start)); + RunAndCheckFilters(slot, count, curBulkSize); + count += curBulkSize; } } catch (...) { std::cerr << "RDataFrame::Run: event loop was interrupted\n"; diff --git a/tree/dataframe/src/RNTupleDS.cxx b/tree/dataframe/src/RNTupleDS.cxx index 82c72ee81102e..517737eee1a01 100644 --- a/tree/dataframe/src/RNTupleDS.cxx +++ b/tree/dataframe/src/RNTupleDS.cxx @@ -204,7 +204,6 @@ class RNTupleColumnReader : public ROOT::Detail::RDF::RColumnReaderBase { std::unique_ptr fField; ///< The field backing the RDF column std::unique_ptr fValue; ///< The memory location used to read from fField std::shared_ptr fValuePtr; ///< Used to reuse the object created by fValue when reconnecting sources - Long64_t fLastEntry = -1; ///< Last entry number that was read /// For chains, the logical entry and the physical entry in any particular file can be different. /// The entry offset stores the logical entry number (sum of all previous physical entries) when a file of the /// corresponding data source was opened. @@ -217,7 +216,6 @@ class RNTupleColumnReader : public ROOT::Detail::RDF::RColumnReaderBase { /// Connect the field and its subfields to the page source void Connect(RPageSource &source, Long64_t entryOffset) { - assert(fLastEntry == -1); fEntryOffset = entryOffset; @@ -267,16 +265,16 @@ class RNTupleColumnReader : public ROOT::Detail::RDF::RColumnReaderBase { } fValue = nullptr; fField = nullptr; - fLastEntry = -1; } - void *GetImpl(Long64_t entry) final + void *GetImpl(std::size_t /*idx*/) final { return fValue->GetPtr().get(); } + + void LoadImpl(const ROOT::Internal::RDF::RMaskedEntryRange &mask) final { - if (entry != fLastEntry) { - fValue->Read(entry - fEntryOffset); - fLastEntry = entry; + // Assume 1-size bulk for now + if (mask[0]) { + fValue->Read(mask.GetFirstEntry() - fEntryOffset); } - return fValue->GetPtr().get(); } }; } // namespace ROOT::Internal::RDF diff --git a/tree/dataframe/src/RRangeBase.cxx b/tree/dataframe/src/RRangeBase.cxx index 6d7f0fe01d3f8..8b0f91c0fc0e9 100644 --- a/tree/dataframe/src/RRangeBase.cxx +++ b/tree/dataframe/src/RRangeBase.cxx @@ -14,13 +14,17 @@ using ROOT::Detail::RDF::RRangeBase; RRangeBase::RRangeBase(RLoopManager *implPtr, unsigned int start, unsigned int stop, unsigned int stride, const unsigned int nSlots, const std::vector &prevVariations) - : RNodeBase(prevVariations, implPtr), fStart(start), fStop(stop), fStride(stride), fNSlots(nSlots) + : RNodeBase(prevVariations, implPtr, nSlots), + fStart(start), + fStop(stop), + fStride(stride), + fNSlots(nSlots), + fCachedResults(nSlots * ROOT::Internal::RDF::CacheLineStep>()) { } void RRangeBase::InitNode() { - fLastCheckedEntry = -1; fNProcessedEntries = 0; fHasStopped = false; } diff --git a/tree/dataframe/src/RTreeColumnReader.cxx b/tree/dataframe/src/RTreeColumnReader.cxx index 0786bb1a7c6af..6a6c35c2965c0 100644 --- a/tree/dataframe/src/RTreeColumnReader.cxx +++ b/tree/dataframe/src/RTreeColumnReader.cxx @@ -5,9 +5,16 @@ #include -void *ROOT::Internal::RDF::RTreeOpaqueColumnReader::GetImpl(Long64_t) +void *ROOT::Internal::RDF::RTreeOpaqueColumnReader::GetImpl(std::size_t) { - return fTreeValue->GetAddress(); + return fValuePtr; +} + +void ROOT::Internal::RDF::RTreeOpaqueColumnReader::LoadImpl(const ROOT::Internal::RDF::RMaskedEntryRange &mask) +{ + // Assume size-1 bulk for now + if (mask[0]) + fValuePtr = fTreeValue->GetAddress(); } ROOT::Internal::RDF::RTreeOpaqueColumnReader::RTreeOpaqueColumnReader(TTreeReader &r, std::string_view colName) @@ -17,9 +24,16 @@ ROOT::Internal::RDF::RTreeOpaqueColumnReader::RTreeOpaqueColumnReader(TTreeReade ROOT::Internal::RDF::RTreeOpaqueColumnReader::~RTreeOpaqueColumnReader() = default; -void *ROOT::Internal::RDF::RTreeUntypedValueColumnReader::GetImpl(Long64_t) +void *ROOT::Internal::RDF::RTreeUntypedValueColumnReader::GetImpl(std::size_t) { - return fTreeValue->Get(); + return fValuePtr; +} + +void ROOT::Internal::RDF::RTreeUntypedValueColumnReader::LoadImpl(const ROOT::Internal::RDF::RMaskedEntryRange &mask) +{ + // Assume size-1 bulk for now + if (mask[0]) + fValuePtr = fTreeValue->Get(); } ROOT::Internal::RDF::RTreeUntypedValueColumnReader::RTreeUntypedValueColumnReader(TTreeReader &r, @@ -31,7 +45,7 @@ ROOT::Internal::RDF::RTreeUntypedValueColumnReader::RTreeUntypedValueColumnReade ROOT::Internal::RDF::RTreeUntypedValueColumnReader::~RTreeUntypedValueColumnReader() = default; -void *ROOT::Internal::RDF::RTreeUntypedArrayColumnReader::ReadStdArray(Long64_t entry) +void *ROOT::Internal::RDF::RTreeUntypedArrayColumnReader::LoadStdArray(Long64_t entry) { if (entry == fLastEntry) return fRVec.data(); // We return the RVec we already created @@ -61,7 +75,7 @@ void *ROOT::Internal::RDF::RTreeUntypedArrayColumnReader::ReadStdArray(Long64_t return fRVec.data(); } -void *ROOT::Internal::RDF::RTreeUntypedArrayColumnReader::ReadStdVector(Long64_t entry) +void *ROOT::Internal::RDF::RTreeUntypedArrayColumnReader::LoadStdVector(Long64_t entry) { if (entry == fLastEntry) return &fStdVector; // We return the std::vector we already created @@ -95,7 +109,7 @@ void *ROOT::Internal::RDF::RTreeUntypedArrayColumnReader::ReadStdVector(Long64_t return &fStdVector; } -void *ROOT::Internal::RDF::RTreeUntypedArrayColumnReader::ReadRVec(Long64_t entry) +void *ROOT::Internal::RDF::RTreeUntypedArrayColumnReader::LoadRVec(Long64_t entry) { if (entry == fLastEntry) return &fRVec; // We return the RVec we already created @@ -160,15 +174,22 @@ void *ROOT::Internal::RDF::RTreeUntypedArrayColumnReader::ReadRVec(Long64_t entr return &fRVec; } -void *ROOT::Internal::RDF::RTreeUntypedArrayColumnReader::GetImpl(Long64_t entry) +void ROOT::Internal::RDF::RTreeUntypedArrayColumnReader::LoadImpl(const ROOT::Internal::RDF::RMaskedEntryRange &mask) { - if (fCollectionType == ECollectionType::kStdArray) - return ReadStdArray(entry); - - if (fCollectionType == ECollectionType::kStdVector) - return ReadStdVector(entry); + // Assume size-1 bulk for now + if (mask[0]) { + if (fCollectionType == ECollectionType::kStdArray) + fValuePtr = LoadStdArray(mask.GetFirstEntry()); + else if (fCollectionType == ECollectionType::kStdVector) + fValuePtr = LoadStdVector(mask.GetFirstEntry()); + else + fValuePtr = LoadRVec(mask.GetFirstEntry()); + } +} - return ReadRVec(entry); +void *ROOT::Internal::RDF::RTreeUntypedArrayColumnReader::GetImpl(std::size_t) +{ + return fValuePtr; } ROOT::Internal::RDF::RTreeUntypedArrayColumnReader::RTreeUntypedArrayColumnReader(TTreeReader &r, @@ -193,11 +214,24 @@ ROOT::Internal::RDF::RMaskedColumnReader::RMaskedColumnReader( ROOT::Internal::RDF::RMaskedColumnReader::~RMaskedColumnReader() = default; -void *ROOT::Internal::RDF::RMaskedColumnReader::GetImpl(Long64_t event) +void *ROOT::Internal::RDF::RMaskedColumnReader::GetImpl(std::size_t) { - const std::bitset<64> mask{*fTreeValueMask->Get()}; - if (mask.test(fMaskIndex) == false) - return nullptr; + return fValuePtr; +} + +void ROOT::Internal::RDF::RMaskedColumnReader::LoadImpl(const ROOT::Internal::RDF::RMaskedEntryRange &mask) +{ + if (!mask[0]) { + fValuePtr = nullptr; + return; + } + + const std::bitset<64> treeMask{*fTreeValueMask->Get()}; + if (treeMask.test(fMaskIndex) == false) { + fValuePtr = nullptr; + return; + } - return fValueReader->TryGet(event); + fValueReader->Load(mask); + fValuePtr = fValueReader->TryGet(/*idx=*/0u); } diff --git a/tree/dataframe/test/RArraysDS.hxx b/tree/dataframe/test/RArraysDS.hxx index 5e0e654c6f43d..38b54a06d0402 100644 --- a/tree/dataframe/test/RArraysDS.hxx +++ b/tree/dataframe/test/RArraysDS.hxx @@ -12,7 +12,8 @@ class R__CLING_PTRCHECK(off) RArraysDSVarReader final : public ROOT::Detail::RDF::RColumnReaderBase { std::vector *fPtr = nullptr; - void *GetImpl(Long64_t) final { return fPtr; } + void *GetImpl(std::size_t) final { return fPtr; } + void LoadImpl(const ROOT::Internal::RDF::RMaskedEntryRange &) final {} public: RArraysDSVarReader(std::vector &v) : fPtr(&v) {} @@ -21,11 +22,12 @@ public: class R__CLING_PTRCHECK(off) RArraysDSVarSizeReader final : public ROOT::Detail::RDF::RColumnReaderBase { std::vector *fPtr = nullptr; std::size_t fSize = 0; - void *GetImpl(Long64_t) final + void *GetImpl(std::size_t) final { fSize = fPtr->size(); return &fSize; } + void LoadImpl(const ROOT::Internal::RDF::RMaskedEntryRange &) final {} public: RArraysDSVarSizeReader(std::vector &v) : fPtr(&v) {}