Skip to content

Commit e4934f3

Browse files
committed
Store /data/snapshot attribute in groupbased layout too
Renamed from /data/__snapshot__ Only store it if the backend supports step. If the attribute is not found while reading, the old Streaming API logic is used as a fallback (proceed through the iterations linearly). If it is found, the iteration to be returned in the current streaming step is dictated by it. Consistent naming: snapshot instead of __step__ Avoid infinite loops if there is no stream active The backends will explicitly report in their status if it is currently random-accessing. The frontend will then fall back to stepping linearly.
1 parent 7c708a7 commit e4934f3

15 files changed

Lines changed: 375 additions & 89 deletions

include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1216,13 +1216,6 @@ namespace detail
12161216
void
12171217
invalidateVariablesMap();
12181218

1219-
private:
1220-
ADIOS2IOHandlerImpl * m_impl;
1221-
auxiliary::Option< adios2::Engine > m_engine; //! ADIOS engine
1222-
/**
1223-
* The ADIOS2 engine type, to be passed to adios2::IO::SetEngine
1224-
*/
1225-
std::string m_engineType;
12261219
/*
12271220
* streamStatus is NoStream for file-based ADIOS engines.
12281221
* This is relevant for the method BufferedActions::requireActiveStep,
@@ -1313,6 +1306,14 @@ namespace detail
13131306
Undecided
13141307
};
13151308
StreamStatus streamStatus = StreamStatus::OutsideOfStep;
1309+
1310+
private:
1311+
ADIOS2IOHandlerImpl * m_impl;
1312+
auxiliary::Option< adios2::Engine > m_engine; //! ADIOS engine
1313+
/**
1314+
* The ADIOS2 engine type, to be passed to adios2::IO::SetEngine
1315+
*/
1316+
std::string m_engineType;
13161317
adios2::StepStatus m_lastStepStatus = adios2::StepStatus::OK;
13171318

13181319
/**

include/openPMD/IO/AbstractIOHandlerImpl.hpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,10 @@ class AbstractIOHandlerImpl
156156
* The return status code shall be stored as parameters.status.
157157
*/
158158
virtual void
159-
advance( Writable *, Parameter< Operation::ADVANCE > & )
160-
{}
159+
advance( Writable *, Parameter< Operation::ADVANCE > & parameters )
160+
{
161+
*parameters.status = AdvanceStatus::RANDOMACCESS;
162+
}
161163

162164
/** Close an openPMD group.
163165
*
@@ -336,6 +338,11 @@ class AbstractIOHandlerImpl
336338
* The attribute should be of datatype parameters.dtype.
337339
* Any existing attribute with the same name should be overwritten. If possible, only the value should be changed if the datatype stays the same.
338340
* The attribute should be written to physical storage after the operation completes successfully.
341+
* If the parameter changesOverSteps is true, then the attribute must be able
342+
* to hold different values across IO steps. If the backend does not support
343+
* IO steps in such a way, the attribute should not be written.
344+
* (IO steps are an optional backend feature and the frontend must implement
345+
* fallback measures in such a case)
339346
* All datatypes of Datatype should be supported in a type-safe way.
340347
*/
341348
virtual void writeAttribute(Writable*, Parameter< Operation::WRITE_ATT > const&) = 0;

include/openPMD/IO/IOTask.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,8 @@ struct OPENPMDAPI_EXPORT Parameter< Operation::WRITE_ATT > : public AbstractPara
468468
{
469469
Parameter() = default;
470470
Parameter(Parameter const & p) : AbstractParameter(),
471-
name(p.name), dtype(p.dtype), resource(p.resource) {}
471+
name(p.name), dtype(p.dtype), changesOverSteps(p.changesOverSteps),
472+
resource(p.resource) {}
472473

473474
std::unique_ptr< AbstractParameter >
474475
clone() const override
@@ -479,6 +480,7 @@ struct OPENPMDAPI_EXPORT Parameter< Operation::WRITE_ATT > : public AbstractPara
479480

480481
std::string name = "";
481482
Datatype dtype = Datatype::UNDEFINED;
483+
bool changesOverSteps = false;
482484
Attribute::resource resource;
483485
};
484486

include/openPMD/Iteration.hpp

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
#include "openPMD/ParticleSpecies.hpp"
3030
#include "openPMD/Streaming.hpp"
3131

32+
#include <deque>
33+
#include <tuple>
3234

3335
namespace openPMD
3436
{
@@ -252,15 +254,38 @@ class Iteration : public LegacyAttributable
252254
std::make_shared< auxiliary::Option< DeferredParseAccess > >(
253255
auxiliary::Option< DeferredParseAccess >() );
254256

257+
struct BeginStepStatus
258+
{
259+
using AvailableIterations_t =
260+
auxiliary::Option< std::deque< uint64_t > >;
261+
262+
AdvanceStatus stepStatus{};
263+
/*
264+
* If the iteration attribute `snapshot` is present, the value of that
265+
* attribute. Otherwise empty.
266+
*/
267+
AvailableIterations_t iterationsInOpenedStep;
268+
269+
inline operator AdvanceStatus() const
270+
{
271+
return stepStatus;
272+
}
273+
inline
274+
operator std::tuple< AdvanceStatus &, AvailableIterations_t & >()
275+
{
276+
return std::tuple< AdvanceStatus &, AvailableIterations_t & >{
277+
stepStatus, iterationsInOpenedStep };
278+
}
279+
};
280+
255281
/**
256282
* @brief Begin an IO step on the IO file (or file-like object)
257283
* containing this iteration. In case of group-based iteration
258284
* layout, this will be the complete Series.
259285
*
260286
* @return AdvanceStatus
261287
*/
262-
AdvanceStatus
263-
beginStep();
288+
BeginStepStatus beginStep();
264289

265290
/**
266291
* @brief End an IO step on the IO file (or file-like object)

include/openPMD/ReadIterations.hpp

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
#include "openPMD/Iteration.hpp"
2424
#include "openPMD/Series.hpp"
2525

26+
#include <deque>
27+
#include <iostream>
28+
2629
namespace openPMD
2730
{
2831
/**
@@ -54,7 +57,8 @@ class SeriesIterator
5457
using maybe_series_t = auxiliary::Option< Series >;
5558

5659
maybe_series_t m_series;
57-
iteration_index_t m_currentIteration = 0;
60+
std::deque< iteration_index_t > m_iterationsInCurrentStep;
61+
uint64_t m_currentIteration{};
5862

5963
public:
6064
//! construct the end() iterator
@@ -71,6 +75,21 @@ class SeriesIterator
7175
bool operator!=( SeriesIterator const & other ) const;
7276

7377
static SeriesIterator end();
78+
79+
private:
80+
inline bool setCurrentIteration()
81+
{
82+
if( m_iterationsInCurrentStep.empty() )
83+
{
84+
std::cerr << "[ReadIterations] Encountered a step without "
85+
"iterations. Closing the Series."
86+
<< std::endl;
87+
*this = end();
88+
return false;
89+
}
90+
m_currentIteration = *m_iterationsInCurrentStep.begin();
91+
return true;
92+
}
7493
};
7594

7695
/**

include/openPMD/Series.hpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@
4141
# include <mpi.h>
4242
#endif
4343

44+
#include <deque>
4445
#include <map>
46+
#include <set>
4547
#include <string>
4648

4749
// expose private and protected members for invasive testing
@@ -80,6 +82,11 @@ class SeriesData : public AttributableData
8082
Container< Iteration, uint64_t > iterations{};
8183

8284
auxiliary::Option< WriteIterations > m_writeIterations;
85+
/*
86+
* For writing: Remember which iterations have been written in the currently
87+
* active output step. Use this later when writing the snapshot attribute.
88+
*/
89+
std::set< uint64_t > m_currentlyActiveIterations;
8390
auxiliary::Option< std::string > m_overrideFilebasedFilename;
8491
std::string m_name;
8592
std::string m_filenamePrefix;
@@ -383,8 +390,11 @@ class SeriesInterface : public AttributableInterface
383390
* Note on re-parsing of a Series:
384391
* If init == false, the parsing process will seek for new
385392
* Iterations/Records/Record Components etc.
393+
* If series.iterations contains the attribute `snapshot`, returns its
394+
* value (will only be true in variable-based iteration encoding).
386395
*/
387-
void readGorVBased( bool init = true );
396+
auxiliary::Option< std::deque< uint64_t > >
397+
readGorVBased( bool init = true );
388398
void readBase();
389399
std::string iterationFilename( uint64_t i );
390400

@@ -436,6 +446,8 @@ class SeriesInterface : public AttributableInterface
436446
internal::AttributableData & file,
437447
iterations_iterator it,
438448
Iteration & iteration );
449+
450+
void flushStep( bool doFlush );
439451
}; // SeriesInterface
440452

441453
namespace internal

include/openPMD/Streaming.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ namespace openPMD
1919
*/
2020
enum class AdvanceStatus : unsigned char
2121
{
22-
OK, /* stream goes on */
23-
OVER /* stream is over */
22+
OK, /* stream goes on */
23+
OVER, /* stream is over */
24+
RANDOMACCESS /* there is no stream, it will never be over */
2425
};
2526

2627
/**

src/IO/ADIOS/ADIOS2IOHandler.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,11 @@ void ADIOS2IOHandlerImpl::writeAttribute(
602602
switch( attributeLayout() )
603603
{
604604
case AttributeLayout::ByAdiosAttributes:
605+
if( parameters.changesOverSteps )
606+
{
607+
// cannot do this
608+
return;
609+
}
605610
switchType< detail::OldAttributeWriter >(
606611
parameters.dtype,
607612
this,
@@ -619,6 +624,13 @@ void ADIOS2IOHandlerImpl::writeAttribute(
619624
auto prefix = filePositionToString( pos );
620625

621626
auto & filedata = getFileData( file, IfFileNotOpen::ThrowError );
627+
if( parameters.changesOverSteps &&
628+
filedata.streamStatus ==
629+
detail::BufferedActions::StreamStatus::NoStream )
630+
{
631+
// cannot do this
632+
return;
633+
}
622634
filedata.invalidateAttributesMap();
623635
m_dirty.emplace( std::move( file ) );
624636

@@ -2721,7 +2733,7 @@ namespace detail
27212733
m_IO.DefineAttribute< bool_representation >(
27222734
ADIOS2Defaults::str_usesstepsAttribute, 0 );
27232735
flush( FlushLevel::UserFlush, /* writeAttributes = */ false );
2724-
return AdvanceStatus::OK;
2736+
return AdvanceStatus::RANDOMACCESS;
27252737
}
27262738

27272739
m_IO.DefineAttribute< bool_representation >(

src/IO/ADIOS/CommonADIOS1IOHandler.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,6 +1002,11 @@ void
10021002
CommonADIOS1IOHandlerImpl< ChildClass >::writeAttribute(Writable* writable,
10031003
Parameter< Operation::WRITE_ATT > const& parameters)
10041004
{
1005+
if( parameters.changesOverSteps )
1006+
{
1007+
// cannot do this
1008+
return;
1009+
}
10051010
if( m_handler->m_backendAccess == Access::READ_ONLY )
10061011
throw std::runtime_error("[ADIOS1] Writing an attribute in a file opened as read only is not possible.");
10071012

src/IO/HDF5/HDF5IOHandler.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,6 +1033,11 @@ void
10331033
HDF5IOHandlerImpl::writeAttribute(Writable* writable,
10341034
Parameter< Operation::WRITE_ATT > const& parameters)
10351035
{
1036+
if( parameters.changesOverSteps )
1037+
{
1038+
// cannot do this
1039+
return;
1040+
}
10361041
if( m_handler->m_backendAccess == Access::READ_ONLY )
10371042
throw std::runtime_error("[HDF5] Writing an attribute in a file opened as read only is not possible.");
10381043

0 commit comments

Comments
 (0)