Skip to content

Commit 8fd1fb4

Browse files
committed
Store /data/__step__ attribute in groupbased layout too
Only store it if the backend supports step. If the attribute is not found while reading, the old Streaming API logic is used as a fallback (proceed through the iterations linearly). If it is found, the iteration to be returned in the current streaming step is dictated by it.
1 parent b366633 commit 8fd1fb4

13 files changed

Lines changed: 306 additions & 75 deletions

include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1156,13 +1156,6 @@ namespace detail
11561156
void
11571157
invalidateVariablesMap();
11581158

1159-
private:
1160-
ADIOS2IOHandlerImpl * m_impl;
1161-
auxiliary::Option< adios2::Engine > m_engine; //! ADIOS engine
1162-
/**
1163-
* The ADIOS2 engine type, to be passed to adios2::IO::SetEngine
1164-
*/
1165-
std::string m_engineType;
11661159
/*
11671160
* streamStatus is NoStream for file-based ADIOS engines.
11681161
* This is relevant for the method BufferedActions::requireActiveStep,
@@ -1253,6 +1246,14 @@ namespace detail
12531246
Undecided
12541247
};
12551248
StreamStatus streamStatus = StreamStatus::OutsideOfStep;
1249+
1250+
private:
1251+
ADIOS2IOHandlerImpl * m_impl;
1252+
auxiliary::Option< adios2::Engine > m_engine; //! ADIOS engine
1253+
/**
1254+
* The ADIOS2 engine type, to be passed to adios2::IO::SetEngine
1255+
*/
1256+
std::string m_engineType;
12561257
adios2::StepStatus m_lastStepStatus = adios2::StepStatus::OK;
12571258

12581259
/**

include/openPMD/IO/IOTask.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,8 @@ struct OPENPMDAPI_EXPORT Parameter< Operation::WRITE_ATT > : public AbstractPara
432432
{
433433
Parameter() = default;
434434
Parameter(Parameter const & p) : AbstractParameter(),
435-
name(p.name), dtype(p.dtype), resource(p.resource) {}
435+
name(p.name), dtype(p.dtype), changesOverSteps(p.changesOverSteps),
436+
resource(p.resource) {}
436437

437438
std::unique_ptr< AbstractParameter >
438439
clone() const override
@@ -443,6 +444,7 @@ struct OPENPMDAPI_EXPORT Parameter< Operation::WRITE_ATT > : public AbstractPara
443444

444445
std::string name = "";
445446
Datatype dtype = Datatype::UNDEFINED;
447+
bool changesOverSteps = false;
446448
Attribute::resource resource;
447449
};
448450

include/openPMD/Iteration.hpp

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
#include "openPMD/ParticleSpecies.hpp"
3030
#include "openPMD/Streaming.hpp"
3131

32+
#include <deque>
33+
#include <tuple>
3234

3335
namespace openPMD
3436
{
@@ -164,7 +166,7 @@ class Iteration : public LegacyAttributable
164166

165167
void flushFileBased(std::string const&, uint64_t);
166168
void flushGroupBased(uint64_t);
167-
void flushVariableBased(uint64_t);
169+
void flushVariableBased();
168170
void flush();
169171
void deferRead( DeferredRead );
170172
/*
@@ -227,15 +229,34 @@ class Iteration : public LegacyAttributable
227229
std::make_shared< auxiliary::Option< DeferredRead > >(
228230
auxiliary::Option< DeferredRead >() );
229231

232+
struct BeginStepStatus
233+
{
234+
using AvailableIterations_t =
235+
auxiliary::Option< std::deque< uint64_t > >;
236+
237+
AdvanceStatus stepStatus{};
238+
AvailableIterations_t iterationsInOpenedStep;
239+
240+
inline operator AdvanceStatus() const
241+
{
242+
return stepStatus;
243+
}
244+
inline
245+
operator std::tuple< AdvanceStatus &, AvailableIterations_t & >()
246+
{
247+
return std::tuple< AdvanceStatus &, AvailableIterations_t & >{
248+
stepStatus, iterationsInOpenedStep };
249+
}
250+
};
251+
230252
/**
231253
* @brief Begin an IO step on the IO file (or file-like object)
232254
* containing this iteration. In case of group-based iteration
233255
* layout, this will be the complete Series.
234256
*
235257
* @return AdvanceStatus
236258
*/
237-
AdvanceStatus
238-
beginStep();
259+
BeginStepStatus beginStep();
239260

240261
/**
241262
* @brief End an IO step on the IO file (or file-like object)

include/openPMD/ReadIterations.hpp

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
#include "openPMD/Iteration.hpp"
2424
#include "openPMD/Series.hpp"
2525

26+
#include <deque>
27+
#include <iostream>
28+
2629
namespace openPMD
2730
{
2831
/**
@@ -54,7 +57,8 @@ class SeriesIterator
5457
using maybe_series_t = auxiliary::Option< Series >;
5558

5659
maybe_series_t m_series;
57-
iteration_index_t m_currentIteration = 0;
60+
std::deque< iteration_index_t > m_iterationsInCurrentStep;
61+
uint64_t m_currentIteration{};
5862

5963
//! construct the end() iterator
6064
SeriesIterator();
@@ -71,6 +75,21 @@ class SeriesIterator
7175
bool operator!=( SeriesIterator const & other ) const;
7276

7377
static SeriesIterator end();
78+
79+
private:
80+
inline bool setCurrentIteration()
81+
{
82+
if( m_iterationsInCurrentStep.empty() )
83+
{
84+
std::cerr << "[ReadIterations] Encountered a step without "
85+
"iterations. Closing the Series."
86+
<< std::endl;
87+
*this = end();
88+
return false;
89+
}
90+
m_currentIteration = *m_iterationsInCurrentStep.begin();
91+
return true;
92+
}
7493
};
7594

7695
/**

include/openPMD/Series.hpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@
4141
# include <mpi.h>
4242
#endif
4343

44+
#include <deque>
4445
#include <map>
46+
#include <set>
4547
#include <string>
4648

4749
// expose private and protected members for invasive testing
@@ -86,6 +88,7 @@ class SeriesData : public AttributableData
8688

8789
OPENPMD_private :
8890
auxiliary::Option< WriteIterations > m_writeIterations;
91+
std::set< uint64_t > m_currentlyActiveIterations;
8992
auxiliary::Option< std::string > m_overrideFilebasedFilename;
9093
std::string m_name;
9194
std::string m_filenamePrefix;
@@ -355,7 +358,8 @@ class SeriesImpl : public AttributableImpl
355358
* as of yet. Such a facility will be required upon implementing things such
356359
* as resizable datasets.
357360
*/
358-
void readGorVBased( bool init = true );
361+
auxiliary::Option< std::deque< uint64_t > >
362+
readGorVBased( bool init = true );
359363
void readBase();
360364
std::string iterationFilename( uint64_t i );
361365
void openIteration( uint64_t index, Iteration iteration );
@@ -388,6 +392,8 @@ class SeriesImpl : public AttributableImpl
388392
internal::AttributableData & file,
389393
iterations_iterator it,
390394
Iteration & iteration );
395+
396+
void flushStep( bool doFlush );
391397
}; // SeriesImpl
392398

393399
namespace internal

src/IO/ADIOS/ADIOS2IOHandler.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,11 @@ void ADIOS2IOHandlerImpl::writeAttribute(
596596
switch( attributeLayout() )
597597
{
598598
case AttributeLayout::ByAdiosAttributes:
599+
if( parameters.changesOverSteps )
600+
{
601+
// cannot do this
602+
return;
603+
}
599604
switchType(
600605
parameters.dtype,
601606
detail::OldAttributeWriter(),
@@ -614,6 +619,13 @@ void ADIOS2IOHandlerImpl::writeAttribute(
614619
auto prefix = filePositionToString( pos );
615620

616621
auto & filedata = getFileData( file );
622+
if( parameters.changesOverSteps &&
623+
filedata.streamStatus ==
624+
detail::BufferedActions::StreamStatus::NoStream )
625+
{
626+
// cannot do this
627+
return;
628+
}
617629
filedata.invalidateAttributesMap();
618630
m_dirty.emplace( std::move( file ) );
619631

src/IO/ADIOS/CommonADIOS1IOHandler.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -960,6 +960,11 @@ void
960960
CommonADIOS1IOHandlerImpl::writeAttribute(Writable* writable,
961961
Parameter< Operation::WRITE_ATT > const& parameters)
962962
{
963+
if( parameters.changesOverSteps )
964+
{
965+
// cannot do this
966+
return;
967+
}
963968
if( m_handler->m_backendAccess == Access::READ_ONLY )
964969
throw std::runtime_error("[ADIOS1] Writing an attribute in a file opened as read only is not possible.");
965970

src/IO/HDF5/HDF5IOHandler.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -904,6 +904,11 @@ void
904904
HDF5IOHandlerImpl::writeAttribute(Writable* writable,
905905
Parameter< Operation::WRITE_ATT > const& parameters)
906906
{
907+
if( parameters.changesOverSteps )
908+
{
909+
// cannot do this
910+
return;
911+
}
907912
if( m_handler->m_backendAccess == Access::READ_ONLY )
908913
throw std::runtime_error("[HDF5] Writing an attribute in a file opened as read only is not possible.");
909914

src/IO/JSON/JSONIOHandlerImpl.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -914,6 +914,11 @@ namespace openPMD
914914
Parameter< Operation::WRITE_ATT > const & parameter
915915
)
916916
{
917+
if( parameter.changesOverSteps )
918+
{
919+
// cannot do this
920+
return;
921+
}
917922
if(m_handler->m_backendAccess == Access::READ_ONLY )
918923
{
919924
throw std::runtime_error( "[JSON] Creating a dataset in a file opened as read only is not possible." );

src/Iteration.cpp

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -259,15 +259,14 @@ Iteration::flushGroupBased(uint64_t i)
259259
}
260260

261261
void
262-
Iteration::flushVariableBased( uint64_t i )
262+
Iteration::flushVariableBased()
263263
{
264264
if( !written() )
265265
{
266266
/* create iteration path */
267267
Parameter< Operation::OPEN_PATH > pOpen;
268268
pOpen.path = "";
269269
IOHandler()->enqueue( IOTask( this, pOpen ) );
270-
this->setAttribute( "__step__", i );
271270
}
272271

273272
flush();
@@ -511,9 +510,9 @@ void Iteration::read_impl( std::string const & groupPath )
511510
readAttributes();
512511
}
513512

514-
AdvanceStatus
515-
Iteration::beginStep()
513+
auto Iteration::beginStep() -> BeginStepStatus
516514
{
515+
BeginStepStatus res;
517516
using IE = IterationEncoding;
518517
auto & series = retrieveSeries();
519518
// Initialize file with this to quiet warnings
@@ -533,7 +532,8 @@ Iteration::beginStep()
533532
AdvanceMode::BEGINSTEP, *file, series.indexOf( *this ), *this );
534533
if( status != AdvanceStatus::OK )
535534
{
536-
return status;
535+
res.stepStatus = status;
536+
return res;
537537
}
538538

539539
// re-read -> new datasets might be available
@@ -548,12 +548,13 @@ Iteration::beginStep()
548548
auto newType =
549549
const_cast< Access * >( &this->IOHandler()->m_frontendAccess );
550550
*newType = Access::READ_WRITE;
551-
series.readGorVBased( false );
551+
res.iterationsInOpenedStep = series.readGorVBased( false );
552552
*newType = oldType;
553553
series.iterations.written() = previous;
554554
}
555555

556-
return status;
556+
res.stepStatus = status;
557+
return res;
557558
}
558559

559560
void
@@ -577,6 +578,7 @@ Iteration::endStep()
577578
// @todo filebased check
578579
series.advance(
579580
AdvanceMode::ENDSTEP, *file, series.indexOf( *this ), *this );
581+
series.m_currentlyActiveIterations.clear();
580582
}
581583

582584
StepStatus

0 commit comments

Comments
 (0)