Skip to content

Commit 3734652

Browse files
committed
[RDF] Add internal utility to retrieve cluster entry ranges
1 parent de7835a commit 3734652

2 files changed

Lines changed: 110 additions & 0 deletions

File tree

tree/dataframe/inc/ROOT/RDF/RInterface.hxx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ class GraphCreatorHelper;
100100
void ChangeEmptyEntryRange(const ROOT::RDF::RNode &node, std::pair<ULong64_t, ULong64_t> &&newRange);
101101
void ChangeBeginAndEndEntries(const RNode &node, Long64_t begin, Long64_t end);
102102
void ChangeSpec(const ROOT::RDF::RNode &node, ROOT::RDF::Experimental::RDatasetSpec &&spec);
103+
std::vector<std::pair<std::uint64_t, std::uint64_t>> GetDatasetGlobalClusterBoundaries(const RNode &node);
103104
void TriggerRun(ROOT::RDF::RNode node);
104105
std::string GetDataSourceLabel(const ROOT::RDF::RNode &node);
105106
void SetTTreeLifeline(ROOT::RDF::RNode &node, std::any lifeline);
@@ -134,6 +135,8 @@ class RInterface : public RInterfaceBase {
134135
friend void RDFInternal::ChangeEmptyEntryRange(const RNode &node, std::pair<ULong64_t, ULong64_t> &&newRange);
135136
friend void RDFInternal::ChangeBeginAndEndEntries(const RNode &node, Long64_t start, Long64_t end);
136137
friend void RDFInternal::ChangeSpec(const RNode &node, ROOT::RDF::Experimental::RDatasetSpec &&spec);
138+
friend std::vector<std::pair<std::uint64_t, std::uint64_t>>
139+
RDFInternal::GetDatasetGlobalClusterBoundaries(const RNode &node);
137140
friend std::string ROOT::Internal::RDF::GetDataSourceLabel(const RNode &node);
138141
friend void ROOT::Internal::RDF::SetTTreeLifeline(ROOT::RDF::RNode &node, std::any lifeline);
139142
std::shared_ptr<Proxied> fProxiedPtr; ///< Smart pointer to the graph node encapsulated by this RInterface.

tree/dataframe/src/RInterface.cxx

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@
99
*************************************************************************/
1010

1111
#include "ROOT/RDF/RInterface.hxx"
12+
#include <ROOT/InternalTreeUtils.hxx>
13+
#include "ROOT/RNTupleDS.hxx"
14+
#include "ROOT/RTTreeDS.hxx"
15+
#ifdef R__USE_IMT
16+
#include <ROOT/TThreadExecutor.hxx>
17+
#endif
1218

1319
void ROOT::Internal::RDF::ChangeEmptyEntryRange(const ROOT::RDF::RNode &node,
1420
std::pair<ULong64_t, ULong64_t> &&newRange)
@@ -34,6 +40,107 @@ void ROOT::Internal::RDF::ChangeSpec(const ROOT::RDF::RNode &node, ROOT::RDF::Ex
3440
node.GetLoopManager()->ChangeSpec(std::move(spec));
3541
}
3642

43+
/**
44+
* \brief Retrieve the cluster boundaries for each cluster in the dataset,
45+
* across files, with a global offset.
46+
*
47+
* \param node Any node of the computation graph.
48+
* \return A vector of [begin, end) entry pairs for each cluster in the dataset.
49+
*
50+
* \note When IMT is enabled, files are processed in parallel using a thread pool.
51+
*/
52+
std::vector<std::pair<std::uint64_t, std::uint64_t>>
53+
ROOT::Internal::RDF::GetDatasetGlobalClusterBoundaries(const ROOT::RDF::RNode &node)
54+
{
55+
std::vector<std::pair<std::uint64_t, std::uint64_t>> boundaries{};
56+
57+
auto *lm = node.GetLoopManager();
58+
auto *ds = lm->GetDataSource();
59+
60+
if (!ds) {
61+
throw std::runtime_error("Cannot retrieve cluster boundaries: no data source available.");
62+
}
63+
64+
std::string datasetName;
65+
std::vector<std::string> fileNames;
66+
bool isTTree = false;
67+
68+
if (auto *ttreeds = dynamic_cast<RTTreeDS *>(ds)) {
69+
auto *tree = ttreeds->GetTree();
70+
assert(tree && "The internal TTree is not available, something went wrong.");
71+
datasetName = tree->GetName();
72+
fileNames = ROOT::Internal::TreeUtils::GetFileNamesFromTree(*tree);
73+
isTTree = true;
74+
} else if (auto *rntupleds = dynamic_cast<RNTupleDS *>(ds)) {
75+
datasetName = rntupleds->fNTupleName;
76+
fileNames = rntupleds->fFileNames;
77+
isTTree = false;
78+
} else {
79+
throw std::runtime_error("Cannot retrieve cluster boundaries: unsupported data source type.");
80+
}
81+
82+
if (fileNames.empty()) {
83+
return boundaries;
84+
}
85+
86+
const auto nFiles = fileNames.size();
87+
88+
// For each file retrieve the cluster boundaries + the number of entries
89+
using FileResult = std::pair<std::vector<std::pair<std::uint64_t, std::uint64_t>>, std::uint64_t>;
90+
std::vector<FileResult> perFileResults(nFiles);
91+
92+
// Function to process a single file and return its cluster boundaries + entry count
93+
auto processFile = [&datasetName, isTTree](const std::string &fileName) -> FileResult {
94+
std::vector<std::pair<std::uint64_t, std::uint64_t>> clusters;
95+
std::uint64_t nEntries = 0;
96+
97+
if (isTTree) {
98+
// TTree
99+
auto [clusterBoundaries, entries] = ROOT::Internal::TreeUtils::GetClustersAndEntries(datasetName, fileName);
100+
nEntries = entries;
101+
// [0, 10, 20, ...] --> [(0,10), (10,20), ...]
102+
for (std::size_t i = 0; i + 1 < clusterBoundaries.size(); ++i) {
103+
clusters.emplace_back(clusterBoundaries[i], clusterBoundaries[i + 1]);
104+
}
105+
} else {
106+
// RNTuple
107+
auto [clusterBoundaries, entries] = GetClustersAndEntries(datasetName, fileName);
108+
nEntries = entries;
109+
for (const auto &cluster : clusterBoundaries) {
110+
clusters.emplace_back(cluster.fFirstEntry, cluster.fLastEntryPlusOne);
111+
}
112+
}
113+
114+
return {clusters, nEntries};
115+
};
116+
117+
#ifdef R__USE_IMT
118+
ROOT::TThreadExecutor pool;
119+
// Distribute the processing of files in parallel across the thread pool,
120+
// each thread takes a file and its index in the fileNames vector as input
121+
// and fills the corresponding position in the perFileResults vector
122+
pool.Foreach([&perFileResults, &fileNames,
123+
&processFile](std::size_t idx) { perFileResults[idx] = processFile(fileNames[idx]); },
124+
ROOT::TSeq<std::size_t>(nFiles));
125+
#else
126+
// Process files sequentially as a fallback
127+
for (std::size_t idx = 0; idx < nFiles; ++idx) {
128+
perFileResults[idx] = processFile(fileNames[idx]);
129+
}
130+
#endif
131+
// Now that we have the cluster boundaries and entry counts for each file,
132+
// we can compute the global boundaries with offsets (sequentially)
133+
std::uint64_t offset = 0;
134+
for (const auto &[clusters, nEntries] : perFileResults) {
135+
for (const auto &[start, end] : clusters) {
136+
boundaries.emplace_back(offset + start, offset + end);
137+
}
138+
offset += nEntries;
139+
}
140+
141+
return boundaries;
142+
}
143+
37144
/**
38145
* \brief Trigger the execution of an RDataFrame computation graph.
39146
* \param[in] node A node of the computation graph (not a result).

0 commit comments

Comments
 (0)