diff --git a/CMakeLists.txt b/CMakeLists.txt index b3a0cc30c2..3af9872195 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -791,6 +791,7 @@ if(openPMD_BUILD_TESTING) list(APPEND ${out_list} test/Files_ParallelIO/read_variablebased_randomaccess.cpp test/Files_ParallelIO/iterate_nonstreaming_series.cpp + test/Files_ParallelIO/bug_1655_bp5_writer_hangup.cpp ) elseif(${test_name} STREQUAL "Core") list(APPEND ${out_list} diff --git a/src/Series.cpp b/src/Series.cpp index 2fe870a4a9..c0733e34cc 100644 --- a/src/Series.cpp +++ b/src/Series.cpp @@ -2702,7 +2702,8 @@ auto Series::openIterationIfDirty(IterationIndex_t index, Iteration &iteration) { return IterationOpened::RemainsClosed; } - bool const dirtyRecursive = iteration.dirtyRecursive(); + bool dirtyRecursive = iteration.dirtyRecursive(); + if (data.m_closed == internal::CloseStatus::Closed) { // file corresponding with the iteration has previously been @@ -2731,6 +2732,28 @@ auto Series::openIterationIfDirty(IterationIndex_t index, Iteration &iteration) } } + /* + * When using writeIterations(), the currently active Iteration should + * always be flushed (unless, as checked above, it is already closed). + * This block checks if an Iteration is currently collectively opened. + */ + [&]() { + auto &series = get(); + if (!series.m_sharedStatefulIterator) + { + return; + } + auto &shared_iterator = *series.m_sharedStatefulIterator; + if (!shared_iterator.m_data || !shared_iterator.m_data->has_value()) + { + return; + } + auto const current_iteration = + (*shared_iterator.m_data)->currentIteration(); + dirtyRecursive |= + current_iteration.has_value() && *current_iteration == index; + }(); + switch (iterationEncoding()) { using IE = IterationEncoding; diff --git a/test/Files_ParallelIO/ParallelIOTests.hpp b/test/Files_ParallelIO/ParallelIOTests.hpp index 0cb247700e..84d68a214d 100644 --- a/test/Files_ParallelIO/ParallelIOTests.hpp +++ b/test/Files_ParallelIO/ParallelIOTests.hpp @@ -75,4 +75,9 @@ namespace iterate_nonstreaming_series auto iterate_nonstreaming_series() -> void; } +namespace bug_1655_bp5_writer_hangup +{ +auto bug_1655_bp5_writer_hangup() -> void; +} + #endif diff --git a/test/Files_ParallelIO/bug_1655_bp5_writer_hangup.cpp b/test/Files_ParallelIO/bug_1655_bp5_writer_hangup.cpp new file mode 100644 index 0000000000..103c884631 --- /dev/null +++ b/test/Files_ParallelIO/bug_1655_bp5_writer_hangup.cpp @@ -0,0 +1,46 @@ +#include "ParallelIOTests.hpp" + +#include +#include + +namespace bug_1655_bp5_writer_hangup +{ +auto worker(std::string const &ext) -> void +{ + int mpi_size; + int mpi_rank; + + MPI_Comm_size(MPI_COMM_WORLD, &mpi_size); + MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); + + auto const value = float(mpi_size * 100 + mpi_rank); + std::vector local_data(10 * 300, value); + + std::string filename = "../samples/ptl_%T." + ext; + + Series series = Series(filename, Access::CREATE, MPI_COMM_WORLD); + + Datatype datatype = determineDatatype(); + + auto myptl = series.writeIterations()[1].particles["ion"]; + Extent global_ptl = {10ul * mpi_size * 300}; + Dataset dataset_ptl = Dataset(datatype, global_ptl, "{}"); + myptl["charge"].resetDataset(dataset_ptl); + + series.flush(); + + if (mpi_rank == 0) // only rank 0 adds data + myptl["charge"].storeChunk(local_data, {0}, {3000}); + + series.flush(); // hangs here + series.close(); +} + +auto bug_1655_bp5_writer_hangup() -> void +{ + for (auto const &ext : testedFileExtensions()) + { + worker(ext); + } +} +} // namespace bug_1655_bp5_writer_hangup diff --git a/test/ParallelIOTest.cpp b/test/ParallelIOTest.cpp index 9ade0c8f44..2daa369920 100644 --- a/test/ParallelIOTest.cpp +++ b/test/ParallelIOTest.cpp @@ -2183,3 +2183,10 @@ TEST_CASE("iterate_nonstreaming_series", "[serial][adios2]") } #endif // openPMD_HAVE_ADIOS2 && openPMD_HAVE_MPI + +#if openPMD_HAVE_MPI +TEST_CASE("bug_1655_bp5_writer_hangup", "[parallel]") +{ + bug_1655_bp5_writer_hangup::bug_1655_bp5_writer_hangup(); +} +#endif