Skip to content

Commit cc9b516

Browse files
committed
[df] Split value reading into Load+Get
Sets the stage for future bulk reading where the actual values that need to be passed to downstream nodes may need to get stored in a separate location.
1 parent 61244e7 commit cc9b516

28 files changed

Lines changed: 234 additions & 153 deletions

tree/dataframe/inc/ROOT/RCsvDS.hxx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727
namespace ROOT::Internal::RDF {
2828
class R__CLING_PTRCHECK(off) RCsvDSColumnReader final : public ROOT::Detail::RDF::RColumnReaderBase {
2929
void *fValuePtr;
30-
void *GetImpl(Long64_t) final { return fValuePtr; }
30+
void *GetImpl(std::size_t) final { return fValuePtr; }
31+
void LoadImpl(Long64_t, bool) final {}
3132

3233
public:
3334
RCsvDSColumnReader(void *valuePtr) : fValuePtr(valuePtr) {}

tree/dataframe/inc/ROOT/RDF/RAction.hxx

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -99,31 +99,33 @@ public:
9999
}
100100

101101
template <typename ColType>
102-
auto GetValueChecked(unsigned int slot, std::size_t readerIdx, Long64_t entry) -> ColType &
102+
auto GetValueChecked(unsigned int slot, std::size_t readerIdx, std::size_t idx) -> ColType &
103103
{
104-
if (auto *val = fValues[slot][readerIdx]->template TryGet<ColType>(entry))
104+
if (auto *val = fValues[slot][readerIdx]->template TryGet<ColType>(idx))
105105
return *val;
106106

107107
throw std::out_of_range{"RDataFrame: Action (" + fHelper.GetActionName() +
108108
") could not retrieve value for column '" + fColumnNames[readerIdx] + "' for entry " +
109-
std::to_string(entry) +
109+
std::to_string(idx) +
110110
". You can use the DefaultValueFor operation to provide a default value, or "
111111
"FilterAvailable/FilterMissing to discard/keep entries with missing values instead."};
112112
}
113113

114114
template <typename... ColTypes, std::size_t... S>
115-
void CallExec(unsigned int slot, Long64_t entry, TypeList<ColTypes...>, std::index_sequence<S...>)
115+
void CallExec(unsigned int slot, std::size_t idx, TypeList<ColTypes...>, std::index_sequence<S...>)
116116
{
117117
ROOT::Internal::RDF::CallGuaranteedOrder{[&](auto &&...args) { return fHelper.Exec(slot, args...); },
118-
GetValueChecked<ColTypes>(slot, S, entry)...};
119-
(void)entry; // avoid unused parameter warning (gcc 12.1)
118+
GetValueChecked<ColTypes>(slot, S, idx)...};
119+
(void)idx; // avoid unused parameter warning (gcc 12.1)
120120
}
121121

122122
void Run(unsigned int slot, Long64_t entry) final
123123
{
124-
// check if entry passes all filters
125-
if (fPrevNode.CheckFilters(slot, entry))
126-
CallExec(slot, entry, ColumnTypes_t{}, TypeInd_t{});
124+
const auto mask = fPrevNode.CheckFilters(slot, entry);
125+
std::for_each(fValues[slot].begin(), fValues[slot].end(), [entry, mask](auto *v) { v->Load(entry, mask); });
126+
127+
if (mask)
128+
CallExec(slot, /*idx=*/0u, ColumnTypes_t{}, TypeInd_t{});
127129
}
128130

129131
void TriggerChildrenCount() final { fPrevNode.IncrChildrenCount(); }

tree/dataframe/inc/ROOT/RDF/RActionSnapshot.hxx

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -166,27 +166,27 @@ public:
166166
fHelper.InitTask(r, slot);
167167
}
168168

169-
void *GetValue(unsigned int slot, std::size_t readerIdx, Long64_t entry)
169+
void *GetValue(unsigned int slot, std::size_t readerIdx, std::size_t idx)
170170
{
171171
assert(slot < fValues.size());
172172
assert(readerIdx < fValues[slot].size());
173-
if (auto *val = fValues[slot][readerIdx]->template TryGet<void>(entry))
173+
if (auto *val = fValues[slot][readerIdx]->template TryGet<void>(idx))
174174
return val;
175175

176176
throw std::out_of_range{"RDataFrame: Action (" + fHelper.GetActionName() +
177177
") could not retrieve value for column '" + fColumnNames[readerIdx] + "' for entry " +
178-
std::to_string(entry) +
178+
std::to_string(idx) +
179179
". You can use the DefaultValueFor operation to provide a default value, or "
180180
"FilterAvailable/FilterMissing to discard/keep entries with missing values instead."};
181181
}
182182

183-
void CallExec(unsigned int slot, Long64_t entry)
183+
void CallExec(unsigned int slot, std::size_t idx)
184184
{
185185
std::vector<void *> untypedValues;
186186
auto nReaders = fValues[slot].size();
187187
untypedValues.reserve(nReaders);
188188
for (decltype(nReaders) readerIdx{}; readerIdx < nReaders; readerIdx++)
189-
untypedValues.push_back(GetValue(slot, readerIdx, entry));
189+
untypedValues.push_back(GetValue(slot, readerIdx, idx));
190190

191191
fHelper.Exec(slot, untypedValues);
192192
}
@@ -207,14 +207,17 @@ public:
207207
std::vector<void *> untypedValues;
208208
auto nReaders = fValues[slot].size();
209209
untypedValues.reserve(nReaders);
210+
std::for_each(fValues[slot].begin(), fValues[slot].end(), [entry](auto *v) { v->Load(entry, true); });
210211
for (decltype(nReaders) readerIdx{}; readerIdx < nReaders; readerIdx++)
211-
untypedValues.push_back(GetValue(slot, readerIdx, entry));
212+
untypedValues.push_back(GetValue(slot, readerIdx, /*idx=*/0u));
212213

213214
fHelper.Exec(slot, untypedValues, filterPassed);
214215
}
215216
} else {
216-
if (fPrevNodes.front()->CheckFilters(slot, entry))
217-
CallExec(slot, entry);
217+
const auto mask = fPrevNodes.front()->CheckFilters(slot, entry);
218+
std::for_each(fValues[slot].begin(), fValues[slot].end(), [entry, mask](auto *v) { v->Load(entry, mask); });
219+
if (mask)
220+
CallExec(slot, /*idx=*/0u);
218221
}
219222
}
220223

tree/dataframe/inc/ROOT/RDF/RColumnReaderBase.hxx

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,38 @@ This pure virtual class provides a common base class for the different column re
2626
RDSColumnReader.
2727
**/
2828
class R__CLING_PTRCHECK(off) RColumnReaderBase {
29+
Long64_t fLoadedEntry = -1;
30+
2931
public:
3032
virtual ~RColumnReaderBase() = default;
3133

34+
/// Load the column value for the given entry.
35+
/// \param entry The entry number to load.
36+
/// \param mask The entry mask. Values will be loaded only for entries for which the mask equals true.
37+
void Load(Long64_t entry, bool mask)
38+
{
39+
// For now, as `mask` is just a single boolean, as an optimization we can return early here if `mask == false`.
40+
if (mask) {
41+
fLoadedEntry = entry;
42+
this->LoadImpl(entry, mask);
43+
}
44+
}
45+
3246
/// Return the column value for the given entry.
3347
/// \tparam T The column type
3448
/// \param entry The entry number
3549
///
3650
/// The caller is responsible for checking that the returned value actually
3751
/// exists.
3852
template <typename T>
39-
T *TryGet(Long64_t entry)
53+
T *TryGet(std::size_t idx)
4054
{
41-
return static_cast<T *>(GetImpl(entry));
55+
return static_cast<T *>(GetImpl(idx));
4256
}
4357

4458
private:
45-
virtual void *GetImpl(Long64_t entry) = 0;
59+
virtual void *GetImpl(std::size_t idx) = 0;
60+
virtual void LoadImpl(Long64_t /*entry*/, bool /*mask*/) = 0;
4661
};
4762

4863
} // namespace RDF

tree/dataframe/inc/ROOT/RDF/RDSColumnReader.hxx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ template <typename T>
2323
class R__CLING_PTRCHECK(off) RDSColumnReader final : public ROOT::Detail::RDF::RColumnReaderBase {
2424
T **fDSValuePtr = nullptr;
2525

26-
void *GetImpl(Long64_t) final { return *fDSValuePtr; }
26+
void *GetImpl(std::size_t) final { return *fDSValuePtr; }
27+
void LoadImpl(Long64_t, bool) final {}
2728

2829
public:
2930
RDSColumnReader(void *DSValuePtr) : fDSValuePtr(static_cast<T **>(DSValuePtr)) {}

tree/dataframe/inc/ROOT/RDF/RDefaultValueFor.hxx

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ class R__CLING_PTRCHECK(off) RDefaultValueFor final : public RDefineBase {
5757
/// The map key is the full variation name, e.g. "pt:up".
5858
std::unordered_map<std::string, std::unique_ptr<RDefineBase>> fVariedDefines;
5959

60-
T &GetValueOrDefault(unsigned int slot, Long64_t entry)
60+
T &GetValueOrDefault(unsigned int slot, std::size_t idx)
6161
{
62-
if (auto *value = fValues[slot]->template TryGet<T>(entry))
62+
if (auto *value = fValues[slot]->template TryGet<T>(idx))
6363
return *value;
6464
else
6565
return fDefaultValue;
@@ -104,12 +104,15 @@ public:
104104
}
105105

106106
/// Update the value at the address returned by GetValuePtr with the content corresponding to the given entry
107-
void Update(unsigned int slot, Long64_t entry) final
107+
void Update(unsigned int slot, Long64_t entry, bool mask) final
108108
{
109109
if (entry != fLastCheckedEntry[slot * RDFInternal::CacheLineStep<Long64_t>()]) {
110110
// evaluate this define expression, cache the result
111-
fLastResults[slot * RDFInternal::CacheLineStep<T>()] = GetValueOrDefault(slot, entry);
112-
fLastCheckedEntry[slot * RDFInternal::CacheLineStep<Long64_t>()] = entry;
111+
fValues[slot]->Load(entry, mask);
112+
if (mask) {
113+
fLastResults[slot * RDFInternal::CacheLineStep<T>()] = GetValueOrDefault(slot, /*idx=*/0u);
114+
fLastCheckedEntry[slot * RDFInternal::CacheLineStep<Long64_t>()] = entry;
115+
}
113116
}
114117
}
115118

tree/dataframe/inc/ROOT/RDF/RDefine.hxx

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -71,39 +71,43 @@ class R__CLING_PTRCHECK(off) RDefine final : public RDefineBase {
7171
std::unordered_map<std::string, std::unique_ptr<RDefineBase>> fVariedDefines;
7272

7373
template <typename ColType>
74-
auto GetValueChecked(unsigned int slot, std::size_t readerIdx, Long64_t entry) -> ColType &
74+
auto GetValueChecked(unsigned int slot, std::size_t readerIdx, std::size_t idx) -> ColType &
7575
{
76-
if (auto *val = fValues[slot][readerIdx]->template TryGet<ColType>(entry))
76+
if (auto *val = fValues[slot][readerIdx]->template TryGet<ColType>(idx))
7777
return *val;
7878

7979
throw std::out_of_range{"RDataFrame: Define could not retrieve value for column '" + fColumnNames[readerIdx] +
80-
"' for entry " + std::to_string(entry) +
80+
"' for entry " + std::to_string(idx) +
8181
". You can use the DefaultValueFor operation to provide a default value, or "
8282
"FilterAvailable/FilterMissing to discard/keep entries with missing values instead."};
8383
}
8484

8585
template <typename... ColTypes, std::size_t... S>
86-
void UpdateHelper(unsigned int slot, Long64_t entry, TypeList<ColTypes...>, std::index_sequence<S...>, NoneTag)
86+
void UpdateHelper(unsigned int slot, std::size_t idx, Long64_t /*entry*/, TypeList<ColTypes...>,
87+
std::index_sequence<S...>, NoneTag)
8788
{
8889
fLastResults[slot * RDFInternal::CacheLineStep<ret_type>()] =
89-
fExpression(GetValueChecked<ColTypes>(slot, S, entry)...);
90-
(void)entry; // avoid unused parameter warning (gcc 12.1)
90+
fExpression(GetValueChecked<ColTypes>(slot, S, idx)...);
91+
(void)idx; // avoid unused parameter warning (gcc 12.1)
9192
}
9293

9394
template <typename... ColTypes, std::size_t... S>
94-
void UpdateHelper(unsigned int slot, Long64_t entry, TypeList<ColTypes...>, std::index_sequence<S...>, SlotTag)
95+
void UpdateHelper(unsigned int slot, std::size_t idx, Long64_t /*entry*/, TypeList<ColTypes...>,
96+
std::index_sequence<S...>, SlotTag)
9597
{
9698
fLastResults[slot * RDFInternal::CacheLineStep<ret_type>()] =
97-
fExpression(slot, GetValueChecked<ColTypes>(slot, S, entry)...);
98-
(void)entry; // avoid unused parameter warning (gcc 12.1)
99+
fExpression(slot, GetValueChecked<ColTypes>(slot, S, idx)...);
100+
(void)idx; // avoid unused parameter warning (gcc 12.1)
99101
}
100102

101103
template <typename... ColTypes, std::size_t... S>
102-
void
103-
UpdateHelper(unsigned int slot, Long64_t entry, TypeList<ColTypes...>, std::index_sequence<S...>, SlotAndEntryTag)
104+
void UpdateHelper(unsigned int slot, std::size_t idx, Long64_t batchFirstEntry, TypeList<ColTypes...>,
105+
std::index_sequence<S...>, SlotAndEntryTag)
104106
{
105107
fLastResults[slot * RDFInternal::CacheLineStep<ret_type>()] =
106-
fExpression(slot, entry, GetValueChecked<ColTypes>(slot, S, entry)...);
108+
fExpression(slot, batchFirstEntry + idx, GetValueChecked<ColTypes>(slot, S, idx)...);
109+
(void)idx; // avoid unused parameter warning (gcc 12.1)
110+
(void)batchFirstEntry; // avoid unused parameter warning (gcc 12.1)
107111
}
108112

109113
public:
@@ -134,12 +138,14 @@ public:
134138
}
135139

136140
/// Update the value at the address returned by GetValuePtr with the content corresponding to the given entry
137-
void Update(unsigned int slot, Long64_t entry) final
141+
void Update(unsigned int slot, Long64_t entry, bool mask) final
138142
{
139143
if (entry != fLastCheckedEntry[slot * RDFInternal::CacheLineStep<Long64_t>()]) {
140-
// evaluate this define expression, cache the result
141-
UpdateHelper(slot, entry, ColumnTypes_t{}, TypeInd_t{}, ExtraArgsTag{});
142-
fLastCheckedEntry[slot * RDFInternal::CacheLineStep<Long64_t>()] = entry;
144+
std::for_each(fValues[slot].begin(), fValues[slot].end(), [entry, mask](auto *v) { v->Load(entry, mask); });
145+
if (mask) {
146+
UpdateHelper(slot, /*idx=*/0u, entry, ColumnTypes_t{}, TypeInd_t{}, ExtraArgsTag{});
147+
fLastCheckedEntry[slot * RDFInternal::CacheLineStep<Long64_t>()] = entry;
148+
}
143149
}
144150
}
145151

tree/dataframe/inc/ROOT/RDF/RDefineBase.hxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public:
6363
std::string GetName() const;
6464
std::string GetTypeName() const;
6565
/// Update the value at the address returned by GetValuePtr with the content corresponding to the given entry
66-
virtual void Update(unsigned int slot, Long64_t entry) = 0;
66+
virtual void Update(unsigned int slot, Long64_t entry, bool mask) = 0;
6767
/// Update function to be called once per sample, used if the derived type is a RDefinePerSample
6868
virtual void Update(unsigned int /*slot*/, const ROOT::RDF::RSampleInfo &/*id*/) {}
6969
/// Clean-up operations to be performed at the end of a task.

tree/dataframe/inc/ROOT/RDF/RDefinePerSample.hxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public:
5959
return static_cast<void *>(&fLastResults[slot * RDFInternal::CacheLineStep<RetType_t>()]);
6060
}
6161

62-
void Update(unsigned int, Long64_t) final
62+
void Update(unsigned int, Long64_t, bool) final
6363
{
6464
// no-op
6565
}

tree/dataframe/inc/ROOT/RDF/RDefineReader.hxx

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,9 @@ class R__CLING_PTRCHECK(off) RDefineReader final : public ROOT::Detail::RDF::RCo
4242
/// The slot this value belongs to.
4343
unsigned int fSlot = std::numeric_limits<unsigned int>::max();
4444

45-
void *GetImpl(Long64_t entry) final
46-
{
47-
fDefine.Update(fSlot, entry);
48-
return fValuePtr;
49-
}
45+
void *GetImpl(std::size_t /*idx*/) final { return fValuePtr; }
46+
47+
void LoadImpl(Long64_t entry, bool mask) final { fDefine.Update(fSlot, entry, mask); }
5048

5149
public:
5250
RDefineReader(unsigned int slot, RDFDetail::RDefineBase &define)

0 commit comments

Comments
 (0)