diff --git a/include/openPMD/ReadIterations.hpp b/include/openPMD/ReadIterations.hpp index 35f2f740ce..baf5e90280 100644 --- a/include/openPMD/ReadIterations.hpp +++ b/include/openPMD/ReadIterations.hpp @@ -33,6 +33,7 @@ namespace openPMD { class SeriesIterator { + friend class Series; using iteration_index_t = IndexedIteration::index_t; using maybe_series_t = std::optional; diff --git a/include/openPMD/WriteIterations.hpp b/include/openPMD/WriteIterations.hpp index fcf4a8fbfa..fa5565bb79 100644 --- a/include/openPMD/WriteIterations.hpp +++ b/include/openPMD/WriteIterations.hpp @@ -105,5 +105,6 @@ class WriteIterations * Return the iteration that is currently being written to, if it exists. */ std::optional currentIteration(); + std::optional currentIterationIndex() const; }; } // namespace openPMD diff --git a/src/Series.cpp b/src/Series.cpp index d587575b44..63c35e9c57 100644 --- a/src/Series.cpp +++ b/src/Series.cpp @@ -2568,7 +2568,8 @@ auto Series::openIterationIfDirty(IterationIndex_t index, Iteration iteration) { return IterationOpened::RemainsClosed; } - bool const dirtyRecursive = iteration.dirtyRecursive(); + bool dirtyRecursive = iteration.dirtyRecursive(); + if (iteration.get().m_closed == internal::CloseStatus::ClosedInBackend) { // file corresponding with the iteration has previously been @@ -2589,6 +2590,33 @@ auto Series::openIterationIfDirty(IterationIndex_t index, Iteration iteration) return IterationOpened::RemainsClosed; } + /* + * When using writeIterations(), the currently active Iteration should + * always be flushed (unless, as checked above, it is already closed). + * These two blocks checks if an Iteration is currently collectively opened. + */ + auto &series = get(); + [&]() { + if (!series.m_sharedStatefulIterator) + { + return; + } + auto const current_iteration = + series.m_sharedStatefulIterator->peekCurrentIteration(); + dirtyRecursive |= + current_iteration.has_value() && *current_iteration == index; + }(); + [&]() { + if (!series.m_writeIterations) + { + return; + } + auto const current_iteration = + series.m_writeIterations->currentIterationIndex(); + dirtyRecursive |= + current_iteration.has_value() && *current_iteration == index; + }(); + switch (iterationEncoding()) { using IE = IterationEncoding; diff --git a/src/WriteIterations.cpp b/src/WriteIterations.cpp index 0ae7246ae0..55ae10b9b9 100644 --- a/src/WriteIterations.cpp +++ b/src/WriteIterations.cpp @@ -104,16 +104,13 @@ WriteIterations::mapped_type &WriteIterations::operator[](key_type &&key) std::optional WriteIterations::currentIteration() { - if (!shared || !shared->has_value()) + auto current_index = currentIterationIndex(); + if (!current_index.has_value()) { return std::nullopt; } auto &s = shared->value(); - if (!s.currentlyOpen.has_value()) - { - return std::nullopt; - } - Iteration ¤tIteration = s.iterations.at(s.currentlyOpen.value()); + Iteration ¤tIteration = s.iterations.at(current_index.value()); if (currentIteration.closed()) { return std::nullopt; @@ -121,4 +118,15 @@ std::optional WriteIterations::currentIteration() return std::make_optional( IndexedIteration(currentIteration, s.currentlyOpen.value())); } + +std::optional +WriteIterations::currentIterationIndex() const +{ + if (!shared || !shared->has_value()) + { + return std::nullopt; + } + auto &s = shared->value(); + return s.currentlyOpen; +} } // namespace openPMD diff --git a/test/ParallelIOTest.cpp b/test/ParallelIOTest.cpp index 58dc0b06a2..a6c8e68375 100644 --- a/test/ParallelIOTest.cpp +++ b/test/ParallelIOTest.cpp @@ -2202,3 +2202,44 @@ TEST_CASE("adios2_flush_via_step") #endif #endif // openPMD_HAVE_ADIOS2 && openPMD_HAVE_MPI + +#if openPMD_HAVE_MPI +auto bug_1655_bp5_writer_hangup(std::string const &ext) -> void +{ + int mpi_size; + int mpi_rank; + + MPI_Comm_size(MPI_COMM_WORLD, &mpi_size); + MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); + + auto const value = float(mpi_size * 100 + mpi_rank); + std::vector local_data(10 * 300, value); + + std::string filename = "../samples/ptl_%T." + ext; + + Series series = Series(filename, Access::CREATE, MPI_COMM_WORLD); + + Datatype datatype = determineDatatype(); + + auto myptl = series.writeIterations()[1].particles["ion"]; + Extent global_ptl = {10ul * mpi_size * 300}; + Dataset dataset_ptl = Dataset(datatype, global_ptl, "{}"); + myptl["charge"].resetDataset(dataset_ptl); + + series.flush(); + + if (mpi_rank == 0) // only rank 0 adds data + myptl["charge"].storeChunk(local_data, {0}, {3000}); + + series.flush(); // hangs here + series.close(); +} + +TEST_CASE("bug_1655_bp5_writer_hangup", "[parallel]") +{ + for (auto const &ext : testedFileExtensions()) + { + bug_1655_bp5_writer_hangup(ext); + } +} +#endif