diff --git a/src/Iteration.cpp b/src/Iteration.cpp index d50d7d6a2f..ba04e73981 100644 --- a/src/Iteration.cpp +++ b/src/Iteration.cpp @@ -24,6 +24,7 @@ #include "openPMD/Error.hpp" #include "openPMD/IO/AbstractIOHandler.hpp" #include "openPMD/IO/IOTask.hpp" +#include "openPMD/IterationEncoding.hpp" #include "openPMD/Series.hpp" #include "openPMD/Streaming.hpp" #include "openPMD/auxiliary/DerefDynamicCast.hpp" @@ -881,7 +882,9 @@ auto Iteration::beginStep( } } - res.stepStatus = status; + res.stepStatus = series.iterationEncoding() == IterationEncoding::fileBased + ? AdvanceStatus::RANDOMACCESS + : status; return res; } diff --git a/src/snapshots/ContainerImpls.cpp b/src/snapshots/ContainerImpls.cpp index 9e7adef748..0542540e7d 100644 --- a/src/snapshots/ContainerImpls.cpp +++ b/src/snapshots/ContainerImpls.cpp @@ -6,6 +6,7 @@ #include "openPMD/snapshots/IteratorHelpers.hpp" #include "openPMD/snapshots/RandomAccessIterator.hpp" #include "openPMD/snapshots/StatefulIterator.hpp" +#include #include #include #include @@ -209,79 +210,82 @@ auto StatefulSnapshotsContainer::operator[](key_type const &key) { throw std::runtime_error("Stateful iteration on a read-write Series."); } - if (access::write(access)) + else if ( + access::read(access) || + s.series.iterations.find(key) != s.series.iterations.end()) { - auto lastIteration = base_iterator->peekCurrentlyOpenIteration(); - if (lastIteration.has_value()) + return at(key); + } + + assert(access::write(access)); + + auto lastIteration = base_iterator->peekCurrentlyOpenIteration(); + if (lastIteration.has_value()) + { + auto lastIteration_v = lastIteration.value(); + if (lastIteration_v->first == key) { - auto lastIteration_v = lastIteration.value(); - if (lastIteration_v->first == key) - { - return s.series.iterations.at(key); - } - else - { - lastIteration_v->second.close(); // continue below - } + return s.series.iterations.at(key); } - if (auto it = s.series.iterations.find(key); - it == s.series.iterations.end()) + else { - s.currentStep.map_during_t( - [&](detail::CurrentStep::During_t &during) { - ++during.step_count; - base_iterator->get().seen_iterations[key] = - during.step_count; - during.iteration_idx = key; - during.available_iterations_in_step = {key}; - }, - [&](detail::CurrentStep::AtTheEdge where_am_i) - -> detail::CurrentStep::During_t { - base_iterator->get().seen_iterations[key] = 0; - switch (where_am_i) - { - case detail::CurrentStep::AtTheEdge::Begin: - return detail::CurrentStep::During_t{0, key, {key}}; - case detail::CurrentStep::AtTheEdge::End: - throw error::Internal( - "Trying to create a new output step, but the " - "stream is " - "closed?"); - } - throw std::runtime_error("Unreachable!"); - }); + lastIteration_v->second.close(); // continue below } - auto &res = s.series.iterations[key]; - if (res.getStepStatus() != StepStatus::DuringStep) + } + + // create new + auto &res = s.series.iterations[key]; + Iteration::BeginStepStatus status = [&]() { + try { - try + return res.beginStep(/* reread = */ false); + } + catch (error::OperationUnsupportedInBackend const &) + { + s.series.iterations.retrieveSeries() + .get() + .m_currentlyActiveIterations.clear(); + throw; + } + }(); + res.setStepStatus(StepStatus::DuringStep); + + s.currentStep.map_during_t( + [&](detail::CurrentStep::During_t &during) { + switch (status.stepStatus) { - res.beginStep(/* reread = */ false); + case AdvanceStatus::OK: + ++during.step_count; + during.available_iterations_in_step = {key}; + break; + case AdvanceStatus::RANDOMACCESS: + during.available_iterations_in_step.emplace_back(key); + break; + case AdvanceStatus::OVER: + throw error::Internal( + "Backend reported OVER status while trying to create " + "new Iteration."); } - catch (error::OperationUnsupportedInBackend const &) + base_iterator->get().seen_iterations[key] = during.step_count; + during.iteration_idx = key; + }, + [&](detail::CurrentStep::AtTheEdge where_am_i) + -> detail::CurrentStep::During_t { + base_iterator->get().seen_iterations[key] = 0; + switch (where_am_i) { - s.series.iterations.retrieveSeries() - .get() - .m_currentlyActiveIterations.clear(); - throw; + case detail::CurrentStep::AtTheEdge::Begin: + return detail::CurrentStep::During_t{0, key, {key}}; + case detail::CurrentStep::AtTheEdge::End: + throw error::Internal( + "Trying to create a new output step, but the " + "stream is " + "closed?"); } - res.setStepStatus(StepStatus::DuringStep); - } - return res; - } - else if (access::read(access)) - { - auto &result = base_iterator->seek( - {StatefulIterator::Seek::Seek_Iteration_t{key}}); - if (result.is_end()) - { - throw std::out_of_range( - "[StatefulSnapshotsContainer::operator[]()] Cannot (yet) skip " - "to a Snapshot from an I/O step that is not active."); - } - return result->second; - } - throw error::Internal("Control flow error: This should be unreachable."); + throw std::runtime_error("Unreachable!"); + }); + + return res; } auto StatefulSnapshotsContainer::clear() -> void @@ -352,14 +356,14 @@ RandomAccessIteratorContainer &RandomAccessIteratorContainer::operator=( auto RandomAccessIteratorContainer::currentIteration() const -> std::optional { - if (auto begin = m_cont.begin(); begin != m_cont.end()) + for (auto begin = m_cont.rbegin(); begin != m_cont.rend(); ++begin) { - return std::make_optional(&*begin); - } - else - { - return std::nullopt; + if (!begin->second.closed()) + { + return std::make_optional(&*begin); + } } + return std::nullopt; } auto RandomAccessIteratorContainer::begin() -> iterator diff --git a/test/Files_SerialIO/close_and_reopen_test.cpp b/test/Files_SerialIO/close_and_reopen_test.cpp index ddd25e719e..b25fc79f5f 100644 --- a/test/Files_SerialIO/close_and_reopen_test.cpp +++ b/test/Files_SerialIO/close_and_reopen_test.cpp @@ -20,7 +20,8 @@ template auto run_test_filebased( Access writeAccess, WriteIterations &&writeIterations, - std::string const &ext) + std::string const &ext, + bool synchronous) { std::string filename = "../samples/close_iteration_reopen/filebased_%T." + ext; @@ -39,6 +40,11 @@ auto run_test_filebased( B_y.resetDataset({Datatype::INT, {5}}); B_y.storeChunk(data, {0}, {5}); it.close(); + // This also verifies that operator[] and at() can be used to access the + // Iteration after closing + REQUIRE(series.iterations.at(0).closed()); + REQUIRE(writeIterations(series)[0].closed() == !synchronous); + REQUIRE(writeIterations(series).at(0).closed() == !synchronous); } { @@ -54,6 +60,14 @@ auto run_test_filebased( e_position_x.resetDataset({Datatype::INT, {5}}); e_position_x.storeChunk(data, {0}, {5}); it.close(); + REQUIRE(series.iterations.at(1).closed()); + REQUIRE(writeIterations(series).at(1).closed() == !synchronous); + REQUIRE(writeIterations(series)[1].closed() == !synchronous); + // We are in file-based iteration encoding, so the old iteration should + // remain accessible + // Note: this will create a particlespath at iteration 0, which will + // lead to parsing warnings in HDF5. + writeIterations(series).at(0); } { auto it = writeIterations(series)[2]; @@ -172,7 +186,8 @@ auto run_test_groupbased( Access writeAccess, WriteIterations &&writeIterations, std::string const &ext, - std::vector const &readModes) + std::vector const &readModes, + bool synchronous) { std::string filename = "../samples/close_iteration_reopen/groupbased." + ext; @@ -202,6 +217,18 @@ auto run_test_groupbased( B_y.resetDataset({Datatype::INT, {5}}); B_y.storeChunk(data, {0}, {5}); it.close(); + // This also verifies that operator[] and at() can be used to access the + // Iteration after closing + REQUIRE(series.iterations.at(0).closed()); + REQUIRE(writeIterations(series)[0].closed() == !synchronous); + REQUIRE(writeIterations(series).at(0).closed() == !synchronous); + if (synchronous) + { + // we opened a new step, need to do something in it now, + // otherwise we get a corrupted file + B_y.storeChunk(data, {0}, {5}); + it.close(); + } } { @@ -217,6 +244,19 @@ auto run_test_groupbased( E_y.resetDataset({Datatype::INT, {5}}); E_y.storeChunk(data, {0}, {5}); it.close(); + + if (!synchronous || series.backend() != "ADIOS2") + { + writeIterations(series).at(0); + } + else + { + // Cannot go back to an old IO step + // Since the other backends do not use IO steps, + // going back to an old Iteration should remain possible even + // in synchronous modes + REQUIRE_THROWS(writeIterations(series).at(0)); + } } { auto it = writeIterations(series)[2]; @@ -281,19 +321,35 @@ auto close_and_reopen_test() -> void for (auto writeAccess : {Access::CREATE_RANDOM_ACCESS, Access::CREATE_LINEAR}) { + bool synchronous = writeAccess == Access::CREATE_LINEAR; run_test_filebased( - writeAccess, [](Series &s) { return s.iterations; }, "bp"); + writeAccess, [](Series &s) { return s.iterations; }, "bp", false); run_test_filebased( - writeAccess, [](Series &s) { return s.writeIterations(); }, "bp"); + writeAccess, + [](Series &s) { return s.writeIterations(); }, + "bp", + true); run_test_filebased( - writeAccess, [](Series &s) { return s.snapshots(); }, "bp"); + writeAccess, + [](Series &s) { return s.snapshots(); }, + "bp", + synchronous); run_test_filebased( - writeAccess, [](Series &s) { return s.snapshots(); }, "bp"); + writeAccess, + [](Series &s) { return s.snapshots(); }, + "bp", + synchronous); run_test_filebased( - writeAccess, [](Series &s) { return s.snapshots(); }, "json"); + writeAccess, + [](Series &s) { return s.snapshots(); }, + "json", + synchronous); #if openPMD_HAVE_HDF5 run_test_filebased( - writeAccess, [](Series &s) { return s.snapshots(); }, "h5"); + writeAccess, + [](Series &s) { return s.snapshots(); }, + "h5", + synchronous); #endif /* @@ -304,37 +360,43 @@ auto close_and_reopen_test() -> void writeAccess, [](Series &s) { return s.iterations; }, "bp4", - {Access::READ_ONLY, Access::READ_LINEAR}); + {Access::READ_ONLY, Access::READ_LINEAR}, + false); // since these write data in a way that distributes one iteration's data // over multiple steps, only random access read mode makes sense run_test_groupbased( writeAccess, [](Series &s) { return s.writeIterations(); }, "bp4", - {Access::READ_RANDOM_ACCESS}); + {Access::READ_RANDOM_ACCESS}, + true); run_test_groupbased( writeAccess, [](Series &s) { return s.snapshots(); }, "bp4", - {Access::READ_RANDOM_ACCESS}); + {Access::READ_RANDOM_ACCESS}, + synchronous); // that doesnt matter for json tho run_test_groupbased( writeAccess, [](Series &s) { return s.snapshots(); }, "json", - {Access::READ_RANDOM_ACCESS, Access::READ_LINEAR}); + {Access::READ_RANDOM_ACCESS, Access::READ_LINEAR}, + synchronous); #if openPMD_HAVE_HDF5 run_test_groupbased( writeAccess, [](Series &s) { return s.snapshots(); }, "h5", - {Access::READ_RANDOM_ACCESS, Access::READ_LINEAR}); + {Access::READ_RANDOM_ACCESS, Access::READ_LINEAR}, + synchronous); #endif run_test_groupbased( writeAccess, [](Series &s) { return s.snapshots(); }, "json", - {Access::READ_RANDOM_ACCESS, Access::READ_LINEAR}); + {Access::READ_RANDOM_ACCESS, Access::READ_LINEAR}, + synchronous); } } #else