Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions tree/ntuple/inc/ROOT/RPageStorage.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,6 @@ The page source also gives access to the ntuple's metadata.
*/
// clang-format on
class RPageSource : public RPageStorage {
protected:
/// Summarizes meta-data necessary to load a certain page. Used by LoadPageFromSummary().
struct RPageSummary {
ROOT::DescriptorId_t fClusterId = 0;
Expand Down Expand Up @@ -626,6 +625,7 @@ public:
fDescriptor.IncGeneration();
fLock.unlock();
}
ROOT::RNTupleDescriptor &operator*() const { return fDescriptor; }
ROOT::RNTupleDescriptor *operator->() const { return &fDescriptor; }
void MoveIn(ROOT::RNTupleDescriptor desc) { fDescriptor = std::move(desc); }
};
Expand Down Expand Up @@ -705,6 +705,20 @@ private:
ROOT::Internal::RPageRef LoadPageFromSummary(ColumnHandle_t columnHandle, const RPageSummary &pageSummary);

protected:
/// Holds the uncompressed header and footer
struct RStructureBuffer {
std::unique_ptr<unsigned char[]> fBuffer; ///< single buffer for both header and footer
void *fPtrHeader = nullptr; ///< either nullptr or points into fBuffer
void *fPtrFooter = nullptr; ///< either nullptr or points into fBuffer

/// Called at the end of Attach(), i.e. when the header and footer are processed
void Reset()
{
RStructureBuffer empty;
std::swap(empty, *this);
}
};

/// Default I/O performance counters that get registered in `fMetrics`
struct RCounters {
ROOT::Experimental::Detail::RNTupleAtomicCounter &fNReadV;
Expand All @@ -727,16 +741,21 @@ protected:
};

std::unique_ptr<RCounters> fCounters;
RStructureBuffer fStructureBuffer; ///< Populated by LoadStructureImpl(), reset at the end of Attach()

ROOT::RNTupleReadOptions fOptions;

/// Fills fStructureBuffer with the compressed header and footer
virtual void LoadStructureImpl() = 0;
/// `LoadStructureImpl()` has been called before `AttachImpl()` is called
virtual ROOT::RNTupleDescriptor AttachImpl(ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode mode) = 0;
virtual ROOT::RNTupleDescriptor AttachImpl() = 0;
/// Returns a new, unattached page source for the same data set
virtual std::unique_ptr<RPageSource> CloneImpl() const = 0;
// Only called if a task scheduler is set. No-op be default.
virtual void UnzipClusterImpl(ROOT::Internal::RCluster *cluster);
// Loads a page list into the provided buffer. The buffer parameter needs to point to a memory region large enough
// to hold the compressed page list.
virtual void LoadPageListImpl(const RNTupleLocator &locator, void *buffer) = 0;
// Returns a sealed page from storage without adding it to the page pool. The sealed pages buffer and buffer size
// is already initialized.
virtual void LoadSealedPageImpl(const RNTupleLocator &locator, RSealedPage &sealedPage) = 0;
Expand Down
6 changes: 4 additions & 2 deletions tree/ntuple/inc/ROOT/RPageStorageDaos.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,15 @@ private:
/// A URI to a DAOS pool of the form 'daos://pool-label/container-label'
std::string fURI;

RDaosNTupleAnchor fAnchor;
ROOT::Internal::RNTupleDescriptorBuilder fDescriptorBuilder;

void LoadPageListImpl(const RNTupleLocator &locator, void *buffer) final;
void LoadSealedPageImpl(const RNTupleLocator &locator, RSealedPage &sealedPage) final;

protected:
void LoadStructureImpl() final {}
ROOT::RNTupleDescriptor AttachImpl(ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode mode) final;
void LoadStructureImpl() final;
ROOT::RNTupleDescriptor AttachImpl() final;
/// The cloned page source creates a new connection to the pool/container.
std::unique_ptr<RPageSource> CloneImpl() const final;

Expand Down
19 changes: 2 additions & 17 deletions tree/ntuple/inc/ROOT/RPageStorageFile.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -125,20 +125,6 @@ class RPageSourceFile : public RPageSource {
friend class ROOT::RNTuple;

private:
/// Holds the uncompressed header and footer
struct RStructureBuffer {
std::unique_ptr<unsigned char[]> fBuffer; ///< single buffer for both header and footer
void *fPtrHeader = nullptr; ///< either nullptr or points into fBuffer
void *fPtrFooter = nullptr; ///< either nullptr or points into fBuffer

/// Called at the end of Attach(), i.e. when the header and footer are processed
void Reset()
{
RStructureBuffer empty;
std::swap(empty, *this);
}
};

/// Either provided by CreateFromAnchor, or read from the ROOT file given the ntuple name
std::optional<RNTuple> fAnchor;
/// The last cluster from which a page got loaded. Points into fClusterPool->fPool
Expand All @@ -149,8 +135,6 @@ private:
ROOT::Internal::RMiniFileReader fReader;
/// The descriptor is created from the header and footer either in AttachImpl or in CreateFromAnchor
RNTupleDescriptorBuilder fDescriptorBuilder;
/// Populated by LoadStructureImpl(), reset at the end of Attach()
RStructureBuffer fStructureBuffer;
/// Tracks the last read offset for seek distance calculation
std::uint64_t fLastOffset = 0;

Expand All @@ -176,10 +160,11 @@ private:

protected:
void LoadStructureImpl() final;
ROOT::RNTupleDescriptor AttachImpl(RNTupleSerializer::EDescriptorDeserializeMode mode) final;
ROOT::RNTupleDescriptor AttachImpl() final;
/// The cloned page source creates a new raw file and reader and opens its own file descriptor to the data.
std::unique_ptr<RPageSource> CloneImpl() const final;

void LoadPageListImpl(const RNTupleLocator &locator, void *buffer) final;
void LoadSealedPageImpl(const RNTupleLocator &locator, RSealedPage &sealedPage) final;

public:
Expand Down
22 changes: 20 additions & 2 deletions tree/ntuple/src/RPageStorage.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,27 @@ void ROOT::Internal::RPageSource::LoadStructure()

void ROOT::Internal::RPageSource::Attach(RNTupleSerializer::EDescriptorDeserializeMode mode)
{
if (fIsAttached)
return;

LoadStructure();
if (!fIsAttached)
GetExclDescriptorGuard().MoveIn(AttachImpl(mode));

auto descGuard = GetExclDescriptorGuard();
descGuard.MoveIn(AttachImpl());
fStructureBuffer.Reset();

std::vector<unsigned char> buffer;
for (const auto &cgDesc : descGuard->GetClusterGroupIterable()) {
buffer.resize(cgDesc.GetPageListLength() + cgDesc.GetPageListLocator().GetNBytesOnStorage());
auto zipBuffer = buffer.data() + cgDesc.GetPageListLength();

LoadPageListImpl(cgDesc.GetPageListLocator(), zipBuffer);
RNTupleDecompressor::Unzip(zipBuffer, cgDesc.GetPageListLocator().GetNBytesOnStorage(),
cgDesc.GetPageListLength(), buffer.data());
RNTupleSerializer::DeserializePageList(buffer.data(), cgDesc.GetPageListLength(), cgDesc.GetId(), *descGuard,
mode);
}

fIsAttached = true;
}

Expand Down
101 changes: 54 additions & 47 deletions tree/ntuple/src/RPageStorageDaos.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <cstdlib>
#include <cstring>
#include <limits>
#include <tuple>
#include <utility>
#include <regex>
#include <cassert>
Expand Down Expand Up @@ -129,7 +130,7 @@ struct RDaosContainerNTupleLocator {
int InitNTupleDescriptorBuilder(ROOT::Experimental::Internal::RDaosContainer &cont,
ROOT::Internal::RNTupleDescriptorBuilder &builder)
{
std::unique_ptr<unsigned char[]> buffer, zipBuffer;
std::unique_ptr<unsigned char[]> buffer;
auto &anchor = fAnchor.emplace();
int err;

Expand All @@ -146,22 +147,7 @@ struct RDaosContainerNTupleLocator {

builder.SetVersion(anchor.fVersionEpoch, anchor.fVersionMajor, anchor.fVersionMinor, anchor.fVersionPatch);
builder.SetOnDiskHeaderSize(anchor.fNBytesHeader);
buffer = MakeUninitArray<unsigned char>(anchor.fLenHeader);
zipBuffer = MakeUninitArray<unsigned char>(anchor.fNBytesHeader);
if ((err = cont.ReadSingleAkey(zipBuffer.get(), anchor.fNBytesHeader, oidMetadata, kDistributionKeyDefault,
kAttributeKeyHeader, kCidMetadata)))
return err;
RNTupleDecompressor::Unzip(zipBuffer.get(), anchor.fNBytesHeader, anchor.fLenHeader, buffer.get());
RNTupleSerializer::DeserializeHeader(buffer.get(), anchor.fLenHeader, builder);

builder.AddToOnDiskFooterSize(anchor.fNBytesFooter);
buffer = MakeUninitArray<unsigned char>(anchor.fLenFooter);
zipBuffer = MakeUninitArray<unsigned char>(anchor.fNBytesFooter);
if ((err = cont.ReadSingleAkey(zipBuffer.get(), anchor.fNBytesFooter, oidMetadata, kDistributionKeyDefault,
kAttributeKeyFooter, kCidMetadata)))
return err;
RNTupleDecompressor::Unzip(zipBuffer.get(), anchor.fNBytesFooter, anchor.fLenFooter, buffer.get());
RNTupleSerializer::DeserializeFooter(buffer.get(), anchor.fLenFooter, builder);

return 0;
}
Expand All @@ -174,13 +160,7 @@ struct RDaosContainerNTupleLocator {
auto &loc = result.first;
auto &builder = result.second;

if (int err = loc.InitNTupleDescriptorBuilder(cont, builder); !err) {
if (ntupleName.empty() || ntupleName != builder.GetDescriptor().GetName()) {
// Hash already taken by a differently-named ntuple.
throw ROOT::RException(
R__FAIL("LocateNTuple: ntuple name '" + ntupleName + "' unavailable in this container."));
}
}
loc.InitNTupleDescriptorBuilder(cont, builder);
return result;
}
};
Expand Down Expand Up @@ -455,40 +435,67 @@ ROOT::Experimental::Internal::RPageSourceDaos::~RPageSourceDaos()
StopClusterPoolBackgroundThread();
}

ROOT::RNTupleDescriptor
ROOT::Experimental::Internal::RPageSourceDaos::AttachImpl(RNTupleSerializer::EDescriptorDeserializeMode mode)
void ROOT::Experimental::Internal::RPageSourceDaos::LoadStructureImpl()
{
ROOT::RNTupleDescriptor ntplDesc;
std::unique_ptr<unsigned char[]> buffer, zipBuffer;

auto [locator, descBuilder] = RDaosContainerNTupleLocator::LocateNTuple(*fDaosContainer, fNTupleName);
if (!locator.IsValid())
RDaosContainerNTupleLocator ntupleLocator;
std::tie(ntupleLocator, fDescriptorBuilder) =
RDaosContainerNTupleLocator::LocateNTuple(*fDaosContainer, fNTupleName);
if (!ntupleLocator.IsValid()) {
throw ROOT::RException(
R__FAIL("Attach: requested ntuple '" + fNTupleName + "' is not present in DAOS container."));
R__FAIL("LoadStructureImpl: requested ntuple '" + fNTupleName + "' is not present in DAOS container."));
}
fAnchor = *ntupleLocator.fAnchor;
fNTupleIndex = ntupleLocator.GetIndex();

auto oclass = RDaosObject::ObjClassId(locator.fAnchor->fObjClass);
auto oclass = RDaosObject::ObjClassId(fAnchor.fObjClass);
if (oclass.IsUnknown())
throw ROOT::RException(R__FAIL("Attach: unknown object class " + locator.fAnchor->fObjClass));

throw ROOT::RException(R__FAIL("LoadStructureImpl: unknown object class " + fAnchor.fObjClass));
fDaosContainer->SetDefaultObjectClass(oclass);
fNTupleIndex = locator.GetIndex();
daos_obj_id_t oidPageList{kOidLowPageList, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)};

auto desc = descBuilder.MoveDescriptor();
// Reserve enough space for the compressed and the uncompressed header/footer (see AttachImpl)
const auto bufSize =
fAnchor.fNBytesHeader + fAnchor.fNBytesFooter + std::max(fAnchor.fLenHeader, fAnchor.fLenFooter);
fStructureBuffer.fBuffer = MakeUninitArray<unsigned char>(bufSize);
fStructureBuffer.fPtrHeader = fStructureBuffer.fBuffer.get();
fStructureBuffer.fPtrFooter = fStructureBuffer.fBuffer.get() + fAnchor.fNBytesHeader;

int err;
daos_obj_id_t oidMetadata{kOidLowMetadata, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)};

if ((err = fDaosContainer->ReadSingleAkey(fStructureBuffer.fPtrHeader, fAnchor.fNBytesHeader, oidMetadata,
kDistributionKeyDefault, kAttributeKeyHeader, kCidMetadata))) {
throw ROOT::RException(R__FAIL("LoadStructureImpl: cannot load header: " + std::to_string(err)));
}

if ((err = fDaosContainer->ReadSingleAkey(fStructureBuffer.fPtrFooter, fAnchor.fNBytesFooter, oidMetadata,
kDistributionKeyDefault, kAttributeKeyFooter, kCidMetadata))) {
throw ROOT::RException(R__FAIL("LoadStructureImpl: cannot load footer: " + std::to_string(err)));
}
}

ROOT::RNTupleDescriptor ROOT::Experimental::Internal::RPageSourceDaos::AttachImpl()
{
auto unzipBuf = reinterpret_cast<unsigned char *>(fStructureBuffer.fPtrFooter) + fAnchor.fNBytesFooter;

for (const auto &cgDesc : desc.GetClusterGroupIterable()) {
buffer = MakeUninitArray<unsigned char>(cgDesc.GetPageListLength());
zipBuffer = MakeUninitArray<unsigned char>(cgDesc.GetPageListLocator().GetNBytesOnStorage());
fDaosContainer->ReadSingleAkey(
zipBuffer.get(), cgDesc.GetPageListLocator().GetNBytesOnStorage(), oidPageList, kDistributionKeyDefault,
cgDesc.GetPageListLocator().GetPosition<RNTupleLocatorObject64>().GetLocation(), kCidMetadata);
RNTupleDecompressor::Unzip(zipBuffer.get(), cgDesc.GetPageListLocator().GetNBytesOnStorage(),
cgDesc.GetPageListLength(), buffer.get());
RNTupleDecompressor::Unzip(fStructureBuffer.fPtrHeader, fAnchor.fNBytesHeader, fAnchor.fLenHeader, unzipBuf);
RNTupleSerializer::DeserializeHeader(unzipBuf, fAnchor.fLenHeader, fDescriptorBuilder);

RNTupleSerializer::DeserializePageList(buffer.get(), cgDesc.GetPageListLength(), cgDesc.GetId(), desc, mode);
RNTupleDecompressor::Unzip(fStructureBuffer.fPtrFooter, fAnchor.fNBytesFooter, fAnchor.fLenFooter, unzipBuf);
RNTupleSerializer::DeserializeFooter(unzipBuf, fAnchor.fLenFooter, fDescriptorBuilder);

if (fDescriptorBuilder.GetDescriptor().GetName() != fNTupleName) {
// Hash already taken by a differently-named ntuple.
throw ROOT::RException(R__FAIL("LocateNTuple: ntuple name '" + fNTupleName + "' unavailable in this container."));
}

return desc;
return fDescriptorBuilder.MoveDescriptor();
}

void ROOT::Experimental::Internal::RPageSourceDaos::LoadPageListImpl(const RNTupleLocator &locator, void *buffer)
{
daos_obj_id_t oidPageList{kOidLowPageList, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)};
fDaosContainer->ReadSingleAkey(buffer, locator.GetNBytesOnStorage(), oidPageList, kDistributionKeyDefault,
locator.GetPosition<RNTupleLocatorObject64>().GetLocation(), kCidMetadata);
}

std::string ROOT::Experimental::Internal::RPageSourceDaos::GetObjectClass() const
Expand Down
26 changes: 8 additions & 18 deletions tree/ntuple/src/RPageStorageFile.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ void ROOT::Internal::RPageSourceFile::LoadStructureImpl()
}
}

ROOT::RNTupleDescriptor ROOT::Internal::RPageSourceFile::AttachImpl(RNTupleSerializer::EDescriptorDeserializeMode mode)
ROOT::RNTupleDescriptor ROOT::Internal::RPageSourceFile::AttachImpl()
{
auto unzipBuf = reinterpret_cast<unsigned char *>(fStructureBuffer.fPtrFooter) + fAnchor->GetNBytesFooter();

Expand All @@ -489,33 +489,23 @@ ROOT::RNTupleDescriptor ROOT::Internal::RPageSourceFile::AttachImpl(RNTupleSeria
unzipBuf);
RNTupleSerializer::DeserializeFooter(unzipBuf, fAnchor->GetLenFooter(), fDescriptorBuilder);

auto desc = fDescriptorBuilder.MoveDescriptor();

// fNTupleName is empty if and only if we created this source via CreateFromAnchor. If that's the case, this is the
// earliest we can set the name.
if (fNTupleName.empty())
fNTupleName = desc.GetName();

std::vector<unsigned char> buffer;
for (const auto &cgDesc : desc.GetClusterGroupIterable()) {
buffer.resize(std::max<size_t>(buffer.size(),
cgDesc.GetPageListLength() + cgDesc.GetPageListLocator().GetNBytesOnStorage()));
auto *zipBuffer = buffer.data() + cgDesc.GetPageListLength();
fReader.ReadBuffer(zipBuffer, cgDesc.GetPageListLocator().GetNBytesOnStorage(),
cgDesc.GetPageListLocator().GetPosition<std::uint64_t>());
RNTupleDecompressor::Unzip(zipBuffer, cgDesc.GetPageListLocator().GetNBytesOnStorage(),
cgDesc.GetPageListLength(), buffer.data());

RNTupleSerializer::DeserializePageList(buffer.data(), cgDesc.GetPageListLength(), cgDesc.GetId(), desc, mode);
}
fNTupleName = fDescriptorBuilder.GetDescriptor().GetName();

// For the page reads, we rely on the I/O scheduler to define the read requests
fFile->SetBuffering(false);

// Set file size once after buffering is turned off
fFileSize = fFile->GetSize();

return desc;
return fDescriptorBuilder.MoveDescriptor();
}

void ROOT::Internal::RPageSourceFile::LoadPageListImpl(const RNTupleLocator &locator, void *buffer)
{
fReader.ReadBuffer(buffer, locator.GetNBytesOnStorage(), locator.GetPosition<std::uint64_t>());
}

void ROOT::Internal::RPageSourceFile::LoadSealedPageImpl(const RNTupleLocator &locator, RSealedPage &sealedPage)
Expand Down
3 changes: 2 additions & 1 deletion tree/ntuple/test/ntuple_cluster.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ namespace {
class RPageSourceMock : public RPageSource {
protected:
void LoadStructureImpl() final {}
RNTupleDescriptor AttachImpl(RNTupleSerializer::EDescriptorDeserializeMode) final { return RNTupleDescriptor(); }
RNTupleDescriptor AttachImpl() final { return RNTupleDescriptor(); }
std::unique_ptr<RPageSource> CloneImpl() const final { return nullptr; }
void LoadPageListImpl(const ROOT::RNTupleLocator &, void *) final {}
void LoadSealedPageImpl(const ROOT::RNTupleLocator &, RSealedPage &) final {}
void LoadStreamerInfo() final {}
std::unique_ptr<ROOT::Internal::RPageSource>
Expand Down
6 changes: 2 additions & 4 deletions tree/ntuple/test/ntuple_endian.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,9 @@ class RPageSourceMock : public RPageSource {

protected:
void LoadStructureImpl() final {}
RNTupleDescriptor AttachImpl(ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode) final
{
return RNTupleDescriptor();
}
RNTupleDescriptor AttachImpl() final { return RNTupleDescriptor(); }
std::unique_ptr<RPageSource> CloneImpl() const final { return nullptr; }
void LoadPageListImpl(const ROOT::RNTupleLocator &, void *) final {}
void LoadSealedPageImpl(const ROOT::RNTupleLocator &, RSealedPage &) final {}
void LoadStreamerInfo() final {}

Expand Down
3 changes: 2 additions & 1 deletion tree/ntuple/test/ntuple_pages.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ using ROOT::Internal::RPageRef;
class RPageSourceMock : public RPageSource {
protected:
void LoadStructureImpl() final {}
RNTupleDescriptor AttachImpl(RNTupleSerializer::EDescriptorDeserializeMode) final { return RNTupleDescriptor(); }
RNTupleDescriptor AttachImpl() final { return RNTupleDescriptor(); }
std::unique_ptr<RPageSource> CloneImpl() const final { return nullptr; }
void LoadPageListImpl(const ROOT::RNTupleLocator &, void *) final {}
void LoadSealedPageImpl(const ROOT::RNTupleLocator &, RSealedPage &) final {}
void LoadStreamerInfo() final {}
std::unique_ptr<ROOT::Internal::RPageSource>
Expand Down
Loading