Skip to content

Commit dee6768

Browse files
franzpoeschelax3l
andcommitted
Writing changes: Write current step(s) to snapshot attribute
Only set snapshot attribute if Iteration is not yet written For v-based iteration encoding, the snapshot attribute is already being set before this PR. Just add a comment there. Also add missing <cstdint> includes Co-authored-by: Axel Huebl <axel.huebl@plasma.ninja>
1 parent 29e492b commit dee6768

3 files changed

Lines changed: 79 additions & 5 deletions

File tree

include/openPMD/Series.hpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,10 @@
3737
#include <mpi.h>
3838
#endif
3939

40+
#include <cstdint>
4041
#include <map>
4142
#include <optional>
43+
#include <set>
4244
#include <string>
4345

4446
// expose private and protected members for invasive testing
@@ -82,6 +84,12 @@ namespace internal
8284
* the same instance.
8385
*/
8486
std::optional<WriteIterations> m_writeIterations;
87+
/**
88+
* For writing: Remember which iterations have been written in the
89+
* currently active output step. Use this later when writing the
90+
* snapshot attribute.
91+
*/
92+
std::set<uint64_t> m_currentlyActiveIterations;
8593
/**
8694
* Needed if reading a single iteration of a file-based series.
8795
* Users may specify the concrete filename of one iteration instead of
@@ -627,6 +635,14 @@ OPENPMD_private
627635
internal::AttributableData &file,
628636
iterations_iterator it,
629637
Iteration &iteration);
638+
639+
/**
640+
* @brief Called at the end of an IO step to store the iterations defined
641+
* in the IO step to the snapshot attribute.
642+
*
643+
* @param doFlush If true, flush the IO handler.
644+
*/
645+
void flushStep(bool doFlush);
630646
}; // Series
631647
} // namespace openPMD
632648

src/Iteration.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,13 @@ void Iteration::flushVariableBased(
282282
Parameter<Operation::OPEN_PATH> pOpen;
283283
pOpen.path = "";
284284
IOHandler()->enqueue(IOTask(this, pOpen));
285+
/*
286+
* In v-based encoding, the snapshot attribute must always be written,
287+
* so don't set the `changesOverSteps` flag of the IOTask here.
288+
* Reason: Even in backends that don't support changing attributes,
289+
* variable-based iteration encoding can be used to write one single
290+
* iteration. Then, this attribute determines which iteration it is.
291+
*/
285292
this->setAttribute("snapshot", i);
286293
}
287294

src/Series.cpp

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -732,11 +732,16 @@ void Series::flushFileBased(
732732
switch (openIterationIfDirty(it->first, it->second))
733733
{
734734
using IO = IterationOpened;
735+
case IO::RemainsClosed:
736+
// we might need to proceed further if the close status is
737+
// ClosedInFrontend
738+
// hence no continue here
739+
// otherwise, we might forget to close files physically
740+
break;
735741
case IO::HasBeenOpened:
742+
// continue below
736743
it->second.flush(flushParams);
737744
break;
738-
case IO::RemainsClosed:
739-
break;
740745
}
741746

742747
// Phase 2
@@ -769,13 +774,21 @@ void Series::flushFileBased(
769774
case IO::HasBeenOpened: {
770775
/* as there is only one series,
771776
* emulate the file belonging to each iteration as not yet
772-
* written
777+
* written, even if the iteration itself is already written
778+
* (to ensure that the Series gets reassociated with the
779+
* current iteration)
773780
*/
774781
written() = false;
775782
series.iterations.written() = false;
776783

777784
dirty() |= it->second.dirty();
778785
std::string filename = iterationFilename(it->first);
786+
787+
if(!it->second.written())
788+
{
789+
series.m_currentlyActiveIterations.emplace(it->first);
790+
}
791+
779792
it->second.flushFileBased(filename, it->first, flushParams);
780793

781794
series.iterations.flush(
@@ -831,11 +844,15 @@ void Series::flushGorVBased(
831844
switch (openIterationIfDirty(it->first, it->second))
832845
{
833846
using IO = IterationOpened;
847+
case IO::RemainsClosed:
848+
// we might need to proceed further if the close status is
849+
// ClosedInFrontend
850+
// hence no continue here
851+
break;
834852
case IO::HasBeenOpened:
853+
// continue below
835854
it->second.flush(flushParams);
836855
break;
837-
case IO::RemainsClosed:
838-
break;
839856
}
840857

841858
// Phase 2
@@ -895,6 +912,7 @@ void Series::flushGorVBased(
895912
if (!it->second.written())
896913
{
897914
it->second.parent() = getWritable(&series.iterations);
915+
series.m_currentlyActiveIterations.emplace(it->first);
898916
}
899917
switch (iterationEncoding())
900918
{
@@ -1458,9 +1476,15 @@ AdvanceStatus Series::advance(
14581476
* opening an iteration's file by beginning a step on it.
14591477
* So, return now.
14601478
*/
1479+
iteration.get().m_closed = internal::CloseStatus::ClosedInBackend;
14611480
return AdvanceStatus::OK;
14621481
}
14631482

1483+
if (mode == AdvanceMode::ENDSTEP)
1484+
{
1485+
flushStep(/* doFlush = */ false);
1486+
}
1487+
14641488
Parameter<Operation::ADVANCE> param;
14651489
if (itData.m_closed == internal::CloseStatus::ClosedTemporarily &&
14661490
series.m_iterationEncoding == IterationEncoding::fileBased)
@@ -1517,6 +1541,32 @@ AdvanceStatus Series::advance(
15171541
return *param.status;
15181542
}
15191543

1544+
void Series::flushStep(bool doFlush)
1545+
{
1546+
auto &series = get();
1547+
if (!series.m_currentlyActiveIterations.empty())
1548+
{
1549+
/*
1550+
* Warning: changing attribute extents over time (probably) unsupported
1551+
* by this so far.
1552+
* Not (yet) needed as there is no way to pack several iterations within
1553+
* one IO step.
1554+
*/
1555+
Parameter<Operation::WRITE_ATT> wAttr;
1556+
wAttr.changesOverSteps = true;
1557+
wAttr.name = "snapshot";
1558+
wAttr.resource = std::vector<unsigned long long>{
1559+
series.m_currentlyActiveIterations.begin(),
1560+
series.m_currentlyActiveIterations.end()};
1561+
wAttr.dtype = Datatype::VEC_ULONGLONG;
1562+
IOHandler()->enqueue(IOTask(&series.iterations, wAttr));
1563+
if (doFlush)
1564+
{
1565+
IOHandler()->flush(internal::defaultFlushParams);
1566+
}
1567+
}
1568+
}
1569+
15201570
auto Series::openIterationIfDirty(uint64_t index, Iteration iteration)
15211571
-> IterationOpened
15221572
{
@@ -1795,6 +1845,7 @@ namespace internal
17951845
{
17961846
Series impl{{this, [](auto const *) {}}};
17971847
impl.flush();
1848+
impl.flushStep(/* doFlush = */ true);
17981849
}
17991850
if (m_writeIterations.has_value())
18001851
{

0 commit comments

Comments
 (0)