Skip to content

Commit 73ed51f

Browse files
committed
[RDF] Add internal utility to retrieve cluster entry ranges
1 parent 8a2ab69 commit 73ed51f

2 files changed

Lines changed: 114 additions & 0 deletions

File tree

tree/dataframe/inc/ROOT/RDF/RInterface.hxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ class GraphCreatorHelper;
100100
void ChangeEmptyEntryRange(const ROOT::RDF::RNode &node, std::pair<ULong64_t, ULong64_t> &&newRange);
101101
void ChangeBeginAndEndEntries(const RNode &node, Long64_t begin, Long64_t end);
102102
void ChangeSpec(const ROOT::RDF::RNode &node, ROOT::RDF::Experimental::RDatasetSpec &&spec);
103+
std::vector<std::pair<ULong64_t, ULong64_t>> GetClusterBoundaries(const RNode &node);
103104
void TriggerRun(ROOT::RDF::RNode node);
104105
std::string GetDataSourceLabel(const ROOT::RDF::RNode &node);
105106
void SetTTreeLifeline(ROOT::RDF::RNode &node, std::any lifeline);
@@ -134,6 +135,7 @@ class RInterface : public RInterfaceBase {
134135
friend void RDFInternal::ChangeEmptyEntryRange(const RNode &node, std::pair<ULong64_t, ULong64_t> &&newRange);
135136
friend void RDFInternal::ChangeBeginAndEndEntries(const RNode &node, Long64_t start, Long64_t end);
136137
friend void RDFInternal::ChangeSpec(const RNode &node, ROOT::RDF::Experimental::RDatasetSpec &&spec);
138+
friend std::vector<std::pair<ULong64_t, ULong64_t>> RDFInternal::GetClusterBoundaries(const RNode &node);
137139
friend std::string ROOT::Internal::RDF::GetDataSourceLabel(const RNode &node);
138140
friend void ROOT::Internal::RDF::SetTTreeLifeline(ROOT::RDF::RNode &node, std::any lifeline);
139141
std::shared_ptr<Proxied> fProxiedPtr; ///< Smart pointer to the graph node encapsulated by this RInterface.

tree/dataframe/src/RInterface.cxx

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@
99
*************************************************************************/
1010

1111
#include "ROOT/RDF/RInterface.hxx"
12+
#include <ROOT/InternalTreeUtils.hxx>
13+
#include "ROOT/RNTupleDS.hxx"
14+
#include "ROOT/RTTreeDS.hxx"
15+
#ifdef R__USE_IMT
16+
#include <ROOT/TThreadExecutor.hxx>
17+
#endif
1218

1319
void ROOT::Internal::RDF::ChangeEmptyEntryRange(const ROOT::RDF::RNode &node,
1420
std::pair<ULong64_t, ULong64_t> &&newRange)
@@ -34,6 +40,112 @@ void ROOT::Internal::RDF::ChangeSpec(const ROOT::RDF::RNode &node, ROOT::RDF::Ex
3440
node.GetLoopManager()->ChangeSpec(std::move(spec));
3541
}
3642

43+
/**
44+
* \brief Retrieve the cluster boundaries for each cluster in the dataset,
45+
* across files, with a global offset.
46+
*
47+
* \param node Any node of the computation graph.
48+
* \return A vector of [start, end) entry pairs for each cluster in the dataset.
49+
*
50+
* \note When IMT is enabled, files are processed in parallel using a thread pool.
51+
*/
52+
std::vector<std::pair<ULong64_t, ULong64_t>> ROOT::Internal::RDF::GetClusterBoundaries(const ROOT::RDF::RNode &node)
53+
{
54+
std::vector<std::pair<ULong64_t, ULong64_t>> boundaries{};
55+
56+
auto *lm = node.GetLoopManager();
57+
auto *ds = lm->GetDataSource();
58+
59+
if (!ds) {
60+
throw std::runtime_error("Cannot retrieve cluster boundaries: no data source available.");
61+
}
62+
63+
std::string datasetName;
64+
std::vector<std::string> fileNames;
65+
bool isTTree = false;
66+
67+
if (auto *ttreeds = dynamic_cast<RTTreeDS *>(ds)) {
68+
auto *tree = ttreeds->GetTree();
69+
if (!tree) {
70+
return boundaries;
71+
}
72+
datasetName = tree->GetName();
73+
fileNames = ROOT::Internal::TreeUtils::GetFileNamesFromTree(*tree);
74+
isTTree = true;
75+
} else if (auto *rntupleds = dynamic_cast<RNTupleDS *>(ds)) {
76+
datasetName = rntupleds->fNTupleName;
77+
fileNames = rntupleds->fFileNames;
78+
isTTree = false;
79+
} else {
80+
throw std::runtime_error("Cannot retrieve cluster boundaries: unsupported data source type.");
81+
}
82+
83+
if (fileNames.empty()) {
84+
return boundaries;
85+
}
86+
87+
const auto nFiles = fileNames.size();
88+
89+
// For each file retrieve the cluster boundaries + the number of entries
90+
using FileResult = std::pair<std::vector<std::pair<ULong64_t, ULong64_t>>, ULong64_t>;
91+
std::vector<FileResult> perFileResults(nFiles);
92+
93+
// Function to process a single file and return its cluster boundaries + entry count
94+
auto processFile = [&datasetName, isTTree](const std::string &fileName) -> FileResult {
95+
std::vector<std::pair<ULong64_t, ULong64_t>> clusters;
96+
ULong64_t nEntries = 0;
97+
98+
if (isTTree) {
99+
// TTree
100+
auto [clusterBoundaries, entries] = ROOT::Internal::TreeUtils::GetClustersAndEntries(datasetName, fileName);
101+
nEntries = entries;
102+
// [0, 10, 20, ...] --> [(0,10), (10,20), ...]
103+
for (std::size_t i = 0; i + 1 < clusterBoundaries.size(); ++i) {
104+
clusters.emplace_back(clusterBoundaries[i], clusterBoundaries[i + 1]);
105+
}
106+
} else {
107+
// RNTuple
108+
auto [clusterBoundaries, entries] = GetClustersAndEntries(datasetName, fileName);
109+
nEntries = entries;
110+
for (const auto &cluster : clusterBoundaries) {
111+
clusters.emplace_back(cluster.fFirstEntry, cluster.fLastEntryPlusOne);
112+
}
113+
}
114+
115+
return {clusters, nEntries};
116+
};
117+
118+
#ifdef R__USE_IMT
119+
if (ROOT::IsImplicitMTEnabled()) {
120+
ROOT::TThreadExecutor pool;
121+
// Distribute the processing of files in parallel across the thread pool,
122+
// each thread takes a file and its index in the fileNames vector as input
123+
// and fills the corresponding position in the perFileResults vector
124+
pool.Foreach([&perFileResults, &fileNames,
125+
&processFile](std::size_t idx) { perFileResults[idx] = processFile(fileNames[idx]); },
126+
ROOT::TSeq<std::size_t>(nFiles));
127+
} else
128+
#endif
129+
{
130+
// Process files sequentially as a fallback
131+
for (std::size_t idx = 0; idx < nFiles; ++idx) {
132+
perFileResults[idx] = processFile(fileNames[idx]);
133+
}
134+
}
135+
136+
// Now that we have the cluster boundaries and entry counts for each file,
137+
// we can compute the global boundaries with offsets (sequentially)
138+
ULong64_t offset = 0;
139+
for (const auto &[clusters, nEntries] : perFileResults) {
140+
for (const auto &[start, end] : clusters) {
141+
boundaries.emplace_back(offset + start, offset + end);
142+
}
143+
offset += nEntries;
144+
}
145+
146+
return boundaries;
147+
}
148+
37149
/**
38150
* \brief Trigger the execution of an RDataFrame computation graph.
39151
* \param[in] node A node of the computation graph (not a result).

0 commit comments

Comments
 (0)