diff --git a/src/Series.cpp b/src/Series.cpp index 7e0dcbe895..19c2de72b3 100644 --- a/src/Series.cpp +++ b/src/Series.cpp @@ -547,18 +547,18 @@ SeriesInterface::flushFileBased( iterations_iterator begin, iterations_iterator if( IOHandler()->m_frontendAccess == Access::READ_ONLY ) for( auto it = begin; it != end; ++it ) { + // Phase 1 switch( openIterationIfDirty( it->first, it->second ) ) { using IO = IterationOpened; - case IO::RemainsClosed: - continue; case IO::HasBeenOpened: - // continue below + it->second.flush(); + break; + case IO::RemainsClosed: break; } - it->second.flush(); - + // Phase 2 if( *it->second.m_closed == Iteration::CloseStatus::ClosedInFrontend ) { @@ -567,6 +567,8 @@ SeriesInterface::flushFileBased( iterations_iterator begin, iterations_iterator IOTask( &it->second, std::move( fClose ) ) ); *it->second.m_closed = Iteration::CloseStatus::ClosedInBackend; } + + // Phase 3 IOHandler()->flush(); } else @@ -574,31 +576,33 @@ SeriesInterface::flushFileBased( iterations_iterator begin, iterations_iterator bool allDirty = dirty(); for( auto it = begin; it != end; ++it ) { + // Phase 1 switch( openIterationIfDirty( it->first, it->second ) ) { using IO = IterationOpened; - case IO::RemainsClosed: - continue; - case IO::HasBeenOpened: - // continue below - break; - } - - /* as there is only one series, - * emulate the file belonging to each iteration as not yet written - */ - written() = false; - series.iterations.written() = false; + case IO::HasBeenOpened: { + /* as there is only one series, + * emulate the file belonging to each iteration as not yet + * written + */ + written() = false; + series.iterations.written() = false; - dirty() |= it->second.dirty(); - std::string filename = iterationFilename( it->first ); - it->second.flushFileBased( filename, it->first ); + dirty() |= it->second.dirty(); + std::string filename = iterationFilename( it->first ); + it->second.flushFileBased( filename, it->first ); - series.iterations.flush( - auxiliary::replace_first( basePath(), "%T/", "" ) ); + series.iterations.flush( + auxiliary::replace_first( basePath(), "%T/", "" ) ); - flushAttributes(); + flushAttributes(); + break; + } + case IO::RemainsClosed: + break; + } + // Phase 2 if( *it->second.m_closed == Iteration::CloseStatus::ClosedInFrontend ) { @@ -608,6 +612,7 @@ SeriesInterface::flushFileBased( iterations_iterator begin, iterations_iterator *it->second.m_closed = Iteration::CloseStatus::ClosedInBackend; } + // Phase 3 IOHandler()->flush(); /* reset the dirty bit for every iteration (i.e. file) @@ -625,23 +630,26 @@ SeriesInterface::flushGorVBased( iterations_iterator begin, iterations_iterator if( IOHandler()->m_frontendAccess == Access::READ_ONLY ) for( auto it = begin; it != end; ++it ) { + // Phase 1 switch( openIterationIfDirty( it->first, it->second ) ) { using IO = IterationOpened; - case IO::RemainsClosed: - continue; case IO::HasBeenOpened: - // continue below + it->second.flush(); + break; + case IO::RemainsClosed: break; } - it->second.flush(); + // Phase 2 if( *it->second.m_closed == Iteration::CloseStatus::ClosedInFrontend ) { // the iteration has no dedicated file in group-based mode *it->second.m_closed = Iteration::CloseStatus::ClosedInBackend; } + + // Phase 3 IOHandler()->flush(); } else @@ -654,26 +662,23 @@ SeriesInterface::flushGorVBased( iterations_iterator begin, iterations_iterator IOHandler()->enqueue(IOTask(this, fCreate)); } - series.iterations.flush(auxiliary::replace_first(basePath(), "%T/", "")); + series.iterations.flush( + auxiliary::replace_first( basePath(), "%T/", "" ) ); for( auto it = begin; it != end; ++it ) { + // Phase 1 switch( openIterationIfDirty( it->first, it->second ) ) { using IO = IterationOpened; - case IO::RemainsClosed: - continue; case IO::HasBeenOpened: - // continue below - break; - } - if( !it->second.written() ) - { - it->second.parent() = getWritable( &series.iterations ); - } - switch( iterationEncoding() ) - { - using IE = IterationEncoding; + if( !it->second.written() ) + { + it->second.parent() = getWritable( &series.iterations ); + } + switch( iterationEncoding() ) + { + using IE = IterationEncoding; case IE::groupBased: it->second.flushGroupBased( it->first ); break; @@ -683,8 +688,15 @@ SeriesInterface::flushGorVBased( iterations_iterator begin, iterations_iterator default: throw std::runtime_error( "[Series] Internal control flow error" ); + } + break; + case IO::RemainsClosed: + break; } - if( *it->second.m_closed == Iteration::CloseStatus::ClosedInFrontend ) + + // Phase 2 + if( *it->second.m_closed == + Iteration::CloseStatus::ClosedInFrontend ) { // the iteration has no dedicated file in group-based mode *it->second.m_closed = Iteration::CloseStatus::ClosedInBackend; diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index 7807212c8f..79e8f6845b 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -171,11 +171,17 @@ TEST_CASE( "adios2_char_portability", "[serial][adios2]" ) } #endif -void -write_and_read_many_iterations( std::string const & ext ) { +void write_and_read_many_iterations( + std::string const & ext, bool intermittentFlushes ) +{ // the idea here is to trigger the maximum allowed number of file handles, // e.g., the upper limit in "ulimit -n" (default: often 1024). Once this // is reached, files should be closed automatically for open iterations + + // By flushing the series before closing an iteration, we ensure that the + // iteration is not dirty before closing + // Our flushing logic must not forget to close even if the iteration is + // otherwise untouched and needs not be flushed. unsigned int nIterations = auxiliary::getEnvNum( "OPENPMD_TEST_NFILES_MAX", 1030 ); std::string filename = "../samples/many_iterations/many_iterations_%T." + ext; @@ -189,8 +195,12 @@ write_and_read_many_iterations( std::string const & ext ) { // std::cout << "Putting iteration " << i << std::endl; Iteration it = write.iterations[i]; auto E_x = it.meshes["E"]["x"]; - E_x.resetDataset(ds); - E_x.storeChunk(data, {0}, {10}); + E_x.resetDataset( ds ); + E_x.storeChunk( data, { 0 }, { 10 } ); + if( intermittentFlushes ) + { + write.flush(); + } it.close(); } // ~Series intentionally not yet called @@ -201,8 +211,12 @@ write_and_read_many_iterations( std::string const & ext ) { iteration.second.open(); // std::cout << "Reading iteration " << iteration.first << // std::endl; - auto E_x = iteration.second.meshes["E"]["x"]; - auto chunk = E_x.loadChunk({0}, {10}); + auto E_x = iteration.second.meshes[ "E" ][ "x" ]; + auto chunk = E_x.loadChunk< float >( { 0 }, { 10 } ); + if( intermittentFlushes ) + { + read.flush(); + } iteration.second.close(); auto array = chunk.get(); @@ -218,11 +232,13 @@ write_and_read_many_iterations( std::string const & ext ) { TEST_CASE( "write_and_read_many_iterations", "[serial]" ) { + bool intermittentFlushes = false; if( auxiliary::directory_exists( "../samples/many_iterations" ) ) auxiliary::remove_directory( "../samples/many_iterations" ); for( auto const & t : testedFileExtensions() ) { - write_and_read_many_iterations( t ); + write_and_read_many_iterations( t, intermittentFlushes ); + intermittentFlushes = !intermittentFlushes; } }