Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/Iteration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "openPMD/Error.hpp"
#include "openPMD/IO/AbstractIOHandler.hpp"
#include "openPMD/IO/IOTask.hpp"
#include "openPMD/IterationEncoding.hpp"
#include "openPMD/Series.hpp"
#include "openPMD/Streaming.hpp"
#include "openPMD/auxiliary/DerefDynamicCast.hpp"
Expand Down Expand Up @@ -881,7 +882,9 @@ auto Iteration::beginStep(
}
}

res.stepStatus = status;
res.stepStatus = series.iterationEncoding() == IterationEncoding::fileBased
? AdvanceStatus::RANDOMACCESS
: status;
return res;
}

Expand Down
142 changes: 73 additions & 69 deletions src/snapshots/ContainerImpls.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "openPMD/snapshots/IteratorHelpers.hpp"
#include "openPMD/snapshots/RandomAccessIterator.hpp"
#include "openPMD/snapshots/StatefulIterator.hpp"
#include <cassert>
#include <memory>
#include <optional>
#include <stdexcept>
Expand Down Expand Up @@ -209,79 +210,82 @@ auto StatefulSnapshotsContainer::operator[](key_type const &key)
{
throw std::runtime_error("Stateful iteration on a read-write Series.");
}
if (access::write(access))
else if (
access::read(access) ||
s.series.iterations.find(key) != s.series.iterations.end())
{
auto lastIteration = base_iterator->peekCurrentlyOpenIteration();
if (lastIteration.has_value())
return at(key);
}

assert(access::write(access));

auto lastIteration = base_iterator->peekCurrentlyOpenIteration();
if (lastIteration.has_value())
{
auto lastIteration_v = lastIteration.value();
if (lastIteration_v->first == key)
{
auto lastIteration_v = lastIteration.value();
if (lastIteration_v->first == key)
{
return s.series.iterations.at(key);
}
else
{
lastIteration_v->second.close(); // continue below
}
return s.series.iterations.at(key);
}
if (auto it = s.series.iterations.find(key);
it == s.series.iterations.end())
else
{
s.currentStep.map_during_t(
[&](detail::CurrentStep::During_t &during) {
++during.step_count;
base_iterator->get().seen_iterations[key] =
during.step_count;
during.iteration_idx = key;
during.available_iterations_in_step = {key};
},
[&](detail::CurrentStep::AtTheEdge where_am_i)
-> detail::CurrentStep::During_t {
base_iterator->get().seen_iterations[key] = 0;
switch (where_am_i)
{
case detail::CurrentStep::AtTheEdge::Begin:
return detail::CurrentStep::During_t{0, key, {key}};
case detail::CurrentStep::AtTheEdge::End:
throw error::Internal(
"Trying to create a new output step, but the "
"stream is "
"closed?");
}
throw std::runtime_error("Unreachable!");
});
lastIteration_v->second.close(); // continue below
}
auto &res = s.series.iterations[key];
if (res.getStepStatus() != StepStatus::DuringStep)
}

// create new
auto &res = s.series.iterations[key];
Iteration::BeginStepStatus status = [&]() {
try
{
try
return res.beginStep(/* reread = */ false);
}
catch (error::OperationUnsupportedInBackend const &)
{
s.series.iterations.retrieveSeries()
.get()
.m_currentlyActiveIterations.clear();
throw;
}
}();
res.setStepStatus(StepStatus::DuringStep);

s.currentStep.map_during_t(
[&](detail::CurrentStep::During_t &during) {
switch (status.stepStatus)
{
res.beginStep(/* reread = */ false);
case AdvanceStatus::OK:
++during.step_count;
during.available_iterations_in_step = {key};
break;
case AdvanceStatus::RANDOMACCESS:
during.available_iterations_in_step.emplace_back(key);
break;
case AdvanceStatus::OVER:
throw error::Internal(
"Backend reported OVER status while trying to create "
"new Iteration.");
}
catch (error::OperationUnsupportedInBackend const &)
base_iterator->get().seen_iterations[key] = during.step_count;
during.iteration_idx = key;
},
[&](detail::CurrentStep::AtTheEdge where_am_i)
-> detail::CurrentStep::During_t {
base_iterator->get().seen_iterations[key] = 0;
switch (where_am_i)
{
s.series.iterations.retrieveSeries()
.get()
.m_currentlyActiveIterations.clear();
throw;
case detail::CurrentStep::AtTheEdge::Begin:
return detail::CurrentStep::During_t{0, key, {key}};
case detail::CurrentStep::AtTheEdge::End:
throw error::Internal(
"Trying to create a new output step, but the "
"stream is "
"closed?");
}
res.setStepStatus(StepStatus::DuringStep);
}
return res;
}
else if (access::read(access))
{
auto &result = base_iterator->seek(
{StatefulIterator::Seek::Seek_Iteration_t{key}});
if (result.is_end())
{
throw std::out_of_range(
"[StatefulSnapshotsContainer::operator[]()] Cannot (yet) skip "
"to a Snapshot from an I/O step that is not active.");
}
return result->second;
}
throw error::Internal("Control flow error: This should be unreachable.");
throw std::runtime_error("Unreachable!");
});

return res;
}

auto StatefulSnapshotsContainer::clear() -> void
Expand Down Expand Up @@ -352,14 +356,14 @@ RandomAccessIteratorContainer &RandomAccessIteratorContainer::operator=(
auto RandomAccessIteratorContainer::currentIteration() const
-> std::optional<value_type const *>
{
if (auto begin = m_cont.begin(); begin != m_cont.end())
for (auto begin = m_cont.rbegin(); begin != m_cont.rend(); ++begin)
{
return std::make_optional<value_type const *>(&*begin);
}
else
{
return std::nullopt;
if (!begin->second.closed())
{
return std::make_optional<value_type const *>(&*begin);
}
}
return std::nullopt;
}

auto RandomAccessIteratorContainer::begin() -> iterator
Expand Down
90 changes: 76 additions & 14 deletions test/Files_SerialIO/close_and_reopen_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ template <typename WriteIterations>
auto run_test_filebased(
Access writeAccess,
WriteIterations &&writeIterations,
std::string const &ext)
std::string const &ext,
bool synchronous)
{
std::string filename =
"../samples/close_iteration_reopen/filebased_%T." + ext;
Expand All @@ -39,6 +40,11 @@ auto run_test_filebased(
B_y.resetDataset({Datatype::INT, {5}});
B_y.storeChunk(data, {0}, {5});
it.close();
// This also verifies that operator[] and at() can be used to access the
// Iteration after closing
REQUIRE(series.iterations.at(0).closed());
REQUIRE(writeIterations(series)[0].closed() == !synchronous);
REQUIRE(writeIterations(series).at(0).closed() == !synchronous);
}

{
Expand All @@ -54,6 +60,14 @@ auto run_test_filebased(
e_position_x.resetDataset({Datatype::INT, {5}});
e_position_x.storeChunk(data, {0}, {5});
it.close();
REQUIRE(series.iterations.at(1).closed());
REQUIRE(writeIterations(series).at(1).closed() == !synchronous);
REQUIRE(writeIterations(series)[1].closed() == !synchronous);
// We are in file-based iteration encoding, so the old iteration should
// remain accessible
// Note: this will create a particlespath at iteration 0, which will
// lead to parsing warnings in HDF5.
writeIterations(series).at(0);
}
{
auto it = writeIterations(series)[2];
Expand Down Expand Up @@ -172,7 +186,8 @@ auto run_test_groupbased(
Access writeAccess,
WriteIterations &&writeIterations,
std::string const &ext,
std::vector<Access> const &readModes)
std::vector<Access> const &readModes,
bool synchronous)
{
std::string filename =
"../samples/close_iteration_reopen/groupbased." + ext;
Expand Down Expand Up @@ -202,6 +217,18 @@ auto run_test_groupbased(
B_y.resetDataset({Datatype::INT, {5}});
B_y.storeChunk(data, {0}, {5});
it.close();
// This also verifies that operator[] and at() can be used to access the
// Iteration after closing
REQUIRE(series.iterations.at(0).closed());
REQUIRE(writeIterations(series)[0].closed() == !synchronous);
REQUIRE(writeIterations(series).at(0).closed() == !synchronous);
if (synchronous)
{
// we opened a new step, need to do something in it now,
// otherwise we get a corrupted file
B_y.storeChunk(data, {0}, {5});
it.close();
}
}

{
Expand All @@ -217,6 +244,19 @@ auto run_test_groupbased(
E_y.resetDataset({Datatype::INT, {5}});
E_y.storeChunk(data, {0}, {5});
it.close();

if (!synchronous || series.backend() != "ADIOS2")
{
writeIterations(series).at(0);
}
else
{
// Cannot go back to an old IO step
// Since the other backends do not use IO steps,
// going back to an old Iteration should remain possible even
// in synchronous modes
REQUIRE_THROWS(writeIterations(series).at(0));
}
}
{
auto it = writeIterations(series)[2];
Expand Down Expand Up @@ -281,19 +321,35 @@ auto close_and_reopen_test() -> void
for (auto writeAccess :
{Access::CREATE_RANDOM_ACCESS, Access::CREATE_LINEAR})
{
bool synchronous = writeAccess == Access::CREATE_LINEAR;
run_test_filebased(
writeAccess, [](Series &s) { return s.iterations; }, "bp");
writeAccess, [](Series &s) { return s.iterations; }, "bp", false);
run_test_filebased(
writeAccess, [](Series &s) { return s.writeIterations(); }, "bp");
writeAccess,
[](Series &s) { return s.writeIterations(); },
"bp",
true);
run_test_filebased(
writeAccess, [](Series &s) { return s.snapshots(); }, "bp");
writeAccess,
[](Series &s) { return s.snapshots(); },
"bp",
synchronous);
run_test_filebased(
writeAccess, [](Series &s) { return s.snapshots(); }, "bp");
writeAccess,
[](Series &s) { return s.snapshots(); },
"bp",
synchronous);
run_test_filebased(
writeAccess, [](Series &s) { return s.snapshots(); }, "json");
writeAccess,
[](Series &s) { return s.snapshots(); },
"json",
synchronous);
#if openPMD_HAVE_HDF5
run_test_filebased(
writeAccess, [](Series &s) { return s.snapshots(); }, "h5");
writeAccess,
[](Series &s) { return s.snapshots(); },
"h5",
synchronous);
#endif

/*
Expand All @@ -304,37 +360,43 @@ auto close_and_reopen_test() -> void
writeAccess,
[](Series &s) { return s.iterations; },
"bp4",
{Access::READ_ONLY, Access::READ_LINEAR});
{Access::READ_ONLY, Access::READ_LINEAR},
false);
// since these write data in a way that distributes one iteration's data
// over multiple steps, only random access read mode makes sense
run_test_groupbased(
writeAccess,
[](Series &s) { return s.writeIterations(); },
"bp4",
{Access::READ_RANDOM_ACCESS});
{Access::READ_RANDOM_ACCESS},
true);
run_test_groupbased(
writeAccess,
[](Series &s) { return s.snapshots(); },
"bp4",
{Access::READ_RANDOM_ACCESS});
{Access::READ_RANDOM_ACCESS},
synchronous);
// that doesnt matter for json tho
run_test_groupbased(
writeAccess,
[](Series &s) { return s.snapshots(); },
"json",
{Access::READ_RANDOM_ACCESS, Access::READ_LINEAR});
{Access::READ_RANDOM_ACCESS, Access::READ_LINEAR},
synchronous);
#if openPMD_HAVE_HDF5
run_test_groupbased(
writeAccess,
[](Series &s) { return s.snapshots(); },
"h5",
{Access::READ_RANDOM_ACCESS, Access::READ_LINEAR});
{Access::READ_RANDOM_ACCESS, Access::READ_LINEAR},
synchronous);
#endif
run_test_groupbased(
writeAccess,
[](Series &s) { return s.snapshots(); },
"json",
{Access::READ_RANDOM_ACCESS, Access::READ_LINEAR});
{Access::READ_RANDOM_ACCESS, Access::READ_LINEAR},
synchronous);
}
}
#else
Expand Down
Loading