Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 53 additions & 41 deletions src/Series.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -547,18 +547,18 @@ SeriesInterface::flushFileBased( iterations_iterator begin, iterations_iterator
if( IOHandler()->m_frontendAccess == Access::READ_ONLY )
for( auto it = begin; it != end; ++it )
{
// Phase 1
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does Phase mean in this context? :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just the 3 discrete steps that each iteration of the loop consists of: Flushing the frontend, closing the file, flushing the backend. I just added those comments as an optical guideline to find things faster.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, thanks! Do we maybe want to add this to the comment, e.g. "Phase 1: flush frontend" etc.? Good to keep the context in such long logic chains :)

switch( openIterationIfDirty( it->first, it->second ) )
{
using IO = IterationOpened;
case IO::RemainsClosed:
continue;
case IO::HasBeenOpened:
// continue below
it->second.flush();
break;
case IO::RemainsClosed:
break;
}

it->second.flush();

// Phase 2
if( *it->second.m_closed ==
Iteration::CloseStatus::ClosedInFrontend )
{
Expand All @@ -567,38 +567,42 @@ SeriesInterface::flushFileBased( iterations_iterator begin, iterations_iterator
IOTask( &it->second, std::move( fClose ) ) );
*it->second.m_closed = Iteration::CloseStatus::ClosedInBackend;
}

// Phase 3
IOHandler()->flush();
}
else
{
bool allDirty = dirty();
for( auto it = begin; it != end; ++it )
{
// Phase 1
switch( openIterationIfDirty( it->first, it->second ) )
{
using IO = IterationOpened;
case IO::RemainsClosed:
continue;
case IO::HasBeenOpened:
// continue below
break;
}

/* as there is only one series,
* emulate the file belonging to each iteration as not yet written
*/
written() = false;
series.iterations.written() = false;
case IO::HasBeenOpened: {
/* as there is only one series,
* emulate the file belonging to each iteration as not yet
* written
*/
written() = false;
series.iterations.written() = false;

dirty() |= it->second.dirty();
std::string filename = iterationFilename( it->first );
it->second.flushFileBased( filename, it->first );
dirty() |= it->second.dirty();
std::string filename = iterationFilename( it->first );
it->second.flushFileBased( filename, it->first );

series.iterations.flush(
auxiliary::replace_first( basePath(), "%T/", "" ) );
series.iterations.flush(
auxiliary::replace_first( basePath(), "%T/", "" ) );

flushAttributes();
flushAttributes();
break;
}
case IO::RemainsClosed:
break;
}

// Phase 2
if( *it->second.m_closed ==
Iteration::CloseStatus::ClosedInFrontend )
{
Expand All @@ -608,6 +612,7 @@ SeriesInterface::flushFileBased( iterations_iterator begin, iterations_iterator
*it->second.m_closed = Iteration::CloseStatus::ClosedInBackend;
}

// Phase 3
IOHandler()->flush();

/* reset the dirty bit for every iteration (i.e. file)
Expand All @@ -625,23 +630,26 @@ SeriesInterface::flushGorVBased( iterations_iterator begin, iterations_iterator
if( IOHandler()->m_frontendAccess == Access::READ_ONLY )
for( auto it = begin; it != end; ++it )
{
// Phase 1
switch( openIterationIfDirty( it->first, it->second ) )
{
using IO = IterationOpened;
case IO::RemainsClosed:
continue;
case IO::HasBeenOpened:
// continue below
it->second.flush();
break;
case IO::RemainsClosed:
break;
}

it->second.flush();
// Phase 2
if( *it->second.m_closed ==
Iteration::CloseStatus::ClosedInFrontend )
{
// the iteration has no dedicated file in group-based mode
*it->second.m_closed = Iteration::CloseStatus::ClosedInBackend;
}

// Phase 3
IOHandler()->flush();
}
else
Expand All @@ -654,26 +662,23 @@ SeriesInterface::flushGorVBased( iterations_iterator begin, iterations_iterator
IOHandler()->enqueue(IOTask(this, fCreate));
}

series.iterations.flush(auxiliary::replace_first(basePath(), "%T/", ""));
series.iterations.flush(
auxiliary::replace_first( basePath(), "%T/", "" ) );

for( auto it = begin; it != end; ++it )
{
// Phase 1
switch( openIterationIfDirty( it->first, it->second ) )
{
using IO = IterationOpened;
case IO::RemainsClosed:
continue;
case IO::HasBeenOpened:
// continue below
break;
}
if( !it->second.written() )
{
it->second.parent() = getWritable( &series.iterations );
}
switch( iterationEncoding() )
{
using IE = IterationEncoding;
if( !it->second.written() )
{
it->second.parent() = getWritable( &series.iterations );
}
switch( iterationEncoding() )
{
using IE = IterationEncoding;
case IE::groupBased:
it->second.flushGroupBased( it->first );
break;
Expand All @@ -683,8 +688,15 @@ SeriesInterface::flushGorVBased( iterations_iterator begin, iterations_iterator
default:
throw std::runtime_error(
"[Series] Internal control flow error" );
}
break;
case IO::RemainsClosed:
break;
}
if( *it->second.m_closed == Iteration::CloseStatus::ClosedInFrontend )

// Phase 2
if( *it->second.m_closed ==
Iteration::CloseStatus::ClosedInFrontend )
{
// the iteration has no dedicated file in group-based mode
*it->second.m_closed = Iteration::CloseStatus::ClosedInBackend;
Expand Down
30 changes: 23 additions & 7 deletions test/SerialIOTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,17 @@ TEST_CASE( "adios2_char_portability", "[serial][adios2]" )
}
#endif

void
write_and_read_many_iterations( std::string const & ext ) {
void write_and_read_many_iterations(
std::string const & ext, bool intermittentFlushes )
{
// the idea here is to trigger the maximum allowed number of file handles,
// e.g., the upper limit in "ulimit -n" (default: often 1024). Once this
// is reached, files should be closed automatically for open iterations

// By flushing the series before closing an iteration, we ensure that the
// iteration is not dirty before closing
// Our flushing logic must not forget to close even if the iteration is
// otherwise untouched and needs not be flushed.
unsigned int nIterations = auxiliary::getEnvNum( "OPENPMD_TEST_NFILES_MAX", 1030 );
std::string filename = "../samples/many_iterations/many_iterations_%T." + ext;

Expand All @@ -189,8 +195,12 @@ write_and_read_many_iterations( std::string const & ext ) {
// std::cout << "Putting iteration " << i << std::endl;
Iteration it = write.iterations[i];
auto E_x = it.meshes["E"]["x"];
E_x.resetDataset(ds);
E_x.storeChunk(data, {0}, {10});
E_x.resetDataset( ds );
E_x.storeChunk( data, { 0 }, { 10 } );
if( intermittentFlushes )
{
write.flush();
}
it.close();
}
// ~Series intentionally not yet called
Expand All @@ -201,8 +211,12 @@ write_and_read_many_iterations( std::string const & ext ) {
iteration.second.open();
// std::cout << "Reading iteration " << iteration.first <<
// std::endl;
auto E_x = iteration.second.meshes["E"]["x"];
auto chunk = E_x.loadChunk<float>({0}, {10});
auto E_x = iteration.second.meshes[ "E" ][ "x" ];
auto chunk = E_x.loadChunk< float >( { 0 }, { 10 } );
if( intermittentFlushes )
{
read.flush();
}
iteration.second.close();

auto array = chunk.get();
Expand All @@ -218,11 +232,13 @@ write_and_read_many_iterations( std::string const & ext ) {

TEST_CASE( "write_and_read_many_iterations", "[serial]" )
{
bool intermittentFlushes = false;
if( auxiliary::directory_exists( "../samples/many_iterations" ) )
auxiliary::remove_directory( "../samples/many_iterations" );
for( auto const & t : testedFileExtensions() )
{
write_and_read_many_iterations( t );
write_and_read_many_iterations( t, intermittentFlushes );
intermittentFlushes = !intermittentFlushes;
}
}

Expand Down