Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions tree/dataframe/inc/ROOT/RDF/RInterface.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class GraphCreatorHelper;
void ChangeEmptyEntryRange(const ROOT::RDF::RNode &node, std::pair<ULong64_t, ULong64_t> &&newRange);
void ChangeBeginAndEndEntries(const RNode &node, Long64_t begin, Long64_t end);
void ChangeSpec(const ROOT::RDF::RNode &node, ROOT::RDF::Experimental::RDatasetSpec &&spec);
std::vector<std::pair<std::uint64_t, std::uint64_t>> GetDatasetGlobalClusterBoundaries(const RNode &node);
void TriggerRun(ROOT::RDF::RNode node);
std::string GetDataSourceLabel(const ROOT::RDF::RNode &node);
void SetTTreeLifeline(ROOT::RDF::RNode &node, std::any lifeline);
Expand Down Expand Up @@ -134,6 +135,8 @@ class RInterface : public RInterfaceBase {
friend void RDFInternal::ChangeEmptyEntryRange(const RNode &node, std::pair<ULong64_t, ULong64_t> &&newRange);
friend void RDFInternal::ChangeBeginAndEndEntries(const RNode &node, Long64_t start, Long64_t end);
friend void RDFInternal::ChangeSpec(const RNode &node, ROOT::RDF::Experimental::RDatasetSpec &&spec);
friend std::vector<std::pair<std::uint64_t, std::uint64_t>>
RDFInternal::GetDatasetGlobalClusterBoundaries(const RNode &node);
friend std::string ROOT::Internal::RDF::GetDataSourceLabel(const RNode &node);
friend void ROOT::Internal::RDF::SetTTreeLifeline(ROOT::RDF::RNode &node, std::any lifeline);
std::shared_ptr<Proxied> fProxiedPtr; ///< Smart pointer to the graph node encapsulated by this RInterface.
Expand Down
13 changes: 13 additions & 0 deletions tree/dataframe/inc/ROOT/RNTupleDS.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,17 @@ class RFieldBase;
class RDataFrame;
class RNTuple;
} // namespace ROOT
namespace ROOT::Detail::RDF {
class RNodeBase;
}
namespace ROOT::RDF {
template <typename T>
class RInterface;
}
namespace ROOT::Internal::RDF {
class RNTupleColumnReader;
std::vector<std::pair<std::uint64_t, std::uint64_t>>
GetDatasetGlobalClusterBoundaries(const ROOT::RDF::RInterface<ROOT::Detail::RDF::RNodeBase> &node);
}
namespace ROOT::Internal {
class RPageSource;
Expand Down Expand Up @@ -206,6 +215,10 @@ class RNTupleDS final : public ROOT::RDF::RDataSource {
const std::vector<std::string> &fileNames,
const std::pair<ULong64_t, ULong64_t> &range);

// This function needs to acess private members fNTupleName and fFileNames
friend std::vector<std::pair<std::uint64_t, std::uint64_t>> ROOT::Internal::RDF::GetDatasetGlobalClusterBoundaries(
const ROOT::RDF::RInterface<ROOT::Detail::RDF::RNodeBase> &node);

explicit RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames,
const std::pair<ULong64_t, ULong64_t> &range);

Expand Down
107 changes: 107 additions & 0 deletions tree/dataframe/src/RInterface.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@
*************************************************************************/

#include "ROOT/RDF/RInterface.hxx"
#include <ROOT/InternalTreeUtils.hxx>
#include "ROOT/RNTupleDS.hxx"
#include "ROOT/RTTreeDS.hxx"
#ifdef R__USE_IMT
#include <ROOT/TThreadExecutor.hxx>
#endif

void ROOT::Internal::RDF::ChangeEmptyEntryRange(const ROOT::RDF::RNode &node,
std::pair<ULong64_t, ULong64_t> &&newRange)
Expand All @@ -34,6 +40,107 @@ void ROOT::Internal::RDF::ChangeSpec(const ROOT::RDF::RNode &node, ROOT::RDF::Ex
node.GetLoopManager()->ChangeSpec(std::move(spec));
}

/**
* \brief Retrieve the cluster boundaries for each cluster in the dataset,
* across files, with a global offset.
*
* \param node Any node of the computation graph.
* \return A vector of [begin, end) entry pairs for each cluster in the dataset.
*
* \note When IMT is enabled, files are processed in parallel using a thread pool.
*/
std::vector<std::pair<std::uint64_t, std::uint64_t>>
ROOT::Internal::RDF::GetDatasetGlobalClusterBoundaries(const ROOT::RDF::RNode &node)
{
std::vector<std::pair<std::uint64_t, std::uint64_t>> boundaries{};

auto *lm = node.GetLoopManager();
auto *ds = lm->GetDataSource();

if (!ds) {
throw std::runtime_error("Cannot retrieve cluster boundaries: no data source available.");
}

std::string datasetName;
std::vector<std::string> fileNames;
bool isTTree = false;

if (auto *ttreeds = dynamic_cast<RTTreeDS *>(ds)) {
auto *tree = ttreeds->GetTree();
assert(tree && "The internal TTree is not available, something went wrong.");
datasetName = tree->GetName();
fileNames = ROOT::Internal::TreeUtils::GetFileNamesFromTree(*tree);
isTTree = true;
} else if (auto *rntupleds = dynamic_cast<RNTupleDS *>(ds)) {
datasetName = rntupleds->fNTupleName;
fileNames = rntupleds->fFileNames;
isTTree = false;
} else {
throw std::runtime_error("Cannot retrieve cluster boundaries: unsupported data source type.");
}

if (fileNames.empty()) {
return boundaries;
}

const auto nFiles = fileNames.size();

// For each file retrieve the cluster boundaries + the number of entries
using FileResult = std::pair<std::vector<std::pair<std::uint64_t, std::uint64_t>>, std::uint64_t>;
std::vector<FileResult> perFileResults(nFiles);

// Function to process a single file and return its cluster boundaries + entry count
auto processFile = [&datasetName, isTTree](const std::string &fileName) -> FileResult {
std::vector<std::pair<std::uint64_t, std::uint64_t>> clusters;
std::uint64_t nEntries = 0;

if (isTTree) {
// TTree
auto [clusterBoundaries, entries] = ROOT::Internal::TreeUtils::GetClustersAndEntries(datasetName, fileName);
nEntries = entries;
// [0, 10, 20, ...] --> [(0,10), (10,20), ...]
for (std::size_t i = 0; i + 1 < clusterBoundaries.size(); ++i) {
clusters.emplace_back(clusterBoundaries[i], clusterBoundaries[i + 1]);
}
} else {
// RNTuple
auto [clusterBoundaries, entries] = GetClustersAndEntries(datasetName, fileName);
nEntries = entries;
for (const auto &cluster : clusterBoundaries) {
clusters.emplace_back(cluster.fFirstEntry, cluster.fLastEntryPlusOne);
}
}

return {clusters, nEntries};
};

#ifdef R__USE_IMT
ROOT::TThreadExecutor pool;
// Distribute the processing of files in parallel across the thread pool,
// each thread takes a file and its index in the fileNames vector as input
// and fills the corresponding position in the perFileResults vector
pool.Foreach([&perFileResults, &fileNames,
&processFile](std::size_t idx) { perFileResults[idx] = processFile(fileNames[idx]); },
ROOT::TSeq<std::size_t>(nFiles));
#else
// Process files sequentially as a fallback
for (std::size_t idx = 0; idx < nFiles; ++idx) {
perFileResults[idx] = processFile(fileNames[idx]);
}
#endif
// Now that we have the cluster boundaries and entry counts for each file,
// we can compute the global boundaries with offsets (sequentially)
std::uint64_t offset = 0;
for (const auto &[clusters, nEntries] : perFileResults) {
for (const auto &[start, end] : clusters) {
boundaries.emplace_back(offset + start, offset + end);
}
offset += nEntries;
}

return boundaries;
}

/**
* \brief Trigger the execution of an RDataFrame computation graph.
* \param[in] node A node of the computation graph (not a result).
Expand Down
1 change: 1 addition & 0 deletions tree/dataframe/src/RNTupleDS.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ std::unique_ptr<ROOT::Internal::RPageSource> CreatePageSource(std::string_view n
ROOT::RDF::RNTupleDS::RNTupleDS(std::string_view ntupleName, std::string_view fileName)
: RNTupleDS(CreatePageSource(ntupleName, fileName))
{
fNTupleName = ntupleName;
fFileNames = std::vector<std::string>{std::string{fileName}};
}

Expand Down
90 changes: 90 additions & 0 deletions tree/dataframe/test/dataframe_utils.cxx
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "ROOT/RDataFrame.hxx"
#include "ROOT/RDF/Utils.hxx"
#include "ROOT/RNTupleDS.hxx"
#include "TTree.h"

#include "gtest/gtest.h"
Expand Down Expand Up @@ -237,3 +238,92 @@ TEST(RDataFrameUtils, TypeName2TypeID)
EXPECT_THROW(RDFInt::TypeName2TypeID("float *"), std::runtime_error);
EXPECT_THROW(RDFInt::TypeName2TypeID("float &"), std::runtime_error);
}

TEST(RDataFrameUtils, GetClusterRanges)
{
// Helper RAII class for file cleanup
class FileRAII {
private:
std::string fPath;

public:
explicit FileRAII(const std::string &path) : fPath(path) {}
FileRAII(const FileRAII &) = delete;
FileRAII &operator=(const FileRAII &) = delete;
~FileRAII() { std::remove(fPath.c_str()); }
auto GetPath() const { return fPath.c_str(); }
};

FileRAII fTTree1("dataframe_interfaceAndUtils_2_tree_1.root");
FileRAII fTTree2("dataframe_interfaceAndUtils_2_tree_2.root");
FileRAII fRNTuple1("dataframe_interfaceAndUtils_2_rntuple_1.root");
FileRAII fRNTuple2("dataframe_interfaceAndUtils_2_rntuple_2.root");

const int nEntries = 1000;
const int clusterSize = 250;

// Test TTree single file
{
ROOT::RDF::RSnapshotOptions opts;
opts.fAutoFlush = clusterSize;
auto df = ROOT::RDataFrame(nEntries).Define("i", "rdfentry_");
df.Snapshot("t", fTTree1.GetPath(), {"i"}, opts);
df.Snapshot("t", fTTree2.GetPath(), {"i"}, opts);
}
{
ROOT::RDataFrame dfTTree("t", fTTree1.GetPath());
auto rangesTTree = RDFInt::GetDatasetGlobalClusterBoundaries(dfTTree);

EXPECT_EQ(rangesTTree.size(), nEntries / clusterSize);
for (size_t i = 0; i < rangesTTree.size(); ++i) {
EXPECT_EQ(rangesTTree[i].first, i * clusterSize);
EXPECT_EQ(rangesTTree[i].second, (i + 1) * clusterSize);
}
}

// Test TTree multiple files (chain)
{
ROOT::RDataFrame dfTTreeChain("t", {fTTree1.GetPath(), fTTree2.GetPath()});
auto rangesTTreeChain = RDFInt::GetDatasetGlobalClusterBoundaries(dfTTreeChain);

EXPECT_EQ(rangesTTreeChain.size(), 2 * nEntries / clusterSize);
for (size_t i = 0; i < rangesTTreeChain.size(); ++i) {
EXPECT_EQ(rangesTTreeChain[i].first, i * clusterSize);
EXPECT_EQ(rangesTTreeChain[i].second, (i + 1) * clusterSize);
}
}

// Test RNTuple single file
{
ROOT::RDF::RSnapshotOptions opts;
opts.fOutputFormat = ROOT::RDF::ESnapshotOutputFormat::kRNTuple;
auto df = ROOT::RDataFrame(nEntries).Define("i", "rdfentry_");
df.Snapshot("nt", fRNTuple1.GetPath(), {"i"}, opts);
df.Snapshot("nt", fRNTuple2.GetPath(), {"i"}, opts);
}
{
auto dfRNTuple = ROOT::RDF::FromRNTuple("nt", fRNTuple1.GetPath());
auto rangesRNTuple = RDFInt::GetDatasetGlobalClusterBoundaries(dfRNTuple);

EXPECT_FALSE(rangesRNTuple.empty());
EXPECT_EQ(rangesRNTuple.front().first, 0u);
EXPECT_EQ(rangesRNTuple.back().second, ULong64_t(nEntries));
for (size_t i = 1; i < rangesRNTuple.size(); ++i) {
EXPECT_EQ(rangesRNTuple[i].first, rangesRNTuple[i - 1].second);
}
}

// Test RNTuple multiple files
{
auto dfRNTupleChain =
ROOT::RDF::FromRNTuple("nt", std::vector<std::string>{fRNTuple1.GetPath(), fRNTuple2.GetPath()});
auto rangesRNTupleChain = RDFInt::GetDatasetGlobalClusterBoundaries(dfRNTupleChain);

EXPECT_FALSE(rangesRNTupleChain.empty());
EXPECT_EQ(rangesRNTupleChain.front().first, 0u);
EXPECT_EQ(rangesRNTupleChain.back().second, ULong64_t(2 * nEntries));
for (size_t i = 1; i < rangesRNTupleChain.size(); ++i) {
EXPECT_EQ(rangesRNTupleChain[i].first, rangesRNTupleChain[i - 1].second);
}
}
}
Loading