Skip to content

Commit d8c38d6

Browse files
committed
push_chunk: distinguish immediate from deferred flushes
1 parent 3bfbbbe commit d8c38d6

4 files changed

Lines changed: 35 additions & 26 deletions

File tree

include/openPMD/IO/AbstractIOHandler.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ enum class FlushLevel
6161
UserFlush,
6262
/**
6363
* Flush triggered by storeChunk in immediate flush mode.
64-
* Must not perform storeChunk operations of the Span API.
64+
* Must not perform operations enqueued in m_chunks.
6565
*/
6666
ImmediateFlush,
6767
/**

include/openPMD/RecordComponent.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ namespace internal
7474
*/
7575
std::queue<IOTask> m_chunks;
7676

77-
[[nodiscard]] auto push_chunk(IOTask &&task) -> bool;
77+
void push_chunk(
78+
IOTask &&task, std::optional<bool> immediate_flush = std::nullopt);
7879
/**
7980
* Stores the value for constant record components.
8081
* Ignored otherwise.

include/openPMD/backend/PatchRecordComponent.hpp

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,7 @@ inline void PatchRecordComponent::load(std::shared_ptr<T> data)
157157
dRead.dtype = getDatatype();
158158
dRead.data = std::static_pointer_cast<void>(data);
159159
auto &rc = get();
160-
if (rc.push_chunk(IOTask(this, dRead)))
161-
{
162-
seriesFlush();
163-
}
160+
rc.push_chunk(IOTask(this, dRead));
164161
}
165162

166163
template <typename T>
@@ -199,10 +196,7 @@ inline void PatchRecordComponent::store(uint64_t idx, T data)
199196
dWrite.dtype = dtype;
200197
dWrite.data = std::make_shared<T>(data);
201198
auto &rc = get();
202-
if (rc.push_chunk(IOTask(this, std::move(dWrite))))
203-
{
204-
seriesFlush();
205-
}
199+
rc.push_chunk(IOTask(this, std::move(dWrite)));
206200
}
207201

208202
template <typename T>
@@ -231,9 +225,6 @@ inline void PatchRecordComponent::store(T data)
231225
dWrite.dtype = dtype;
232226
dWrite.data = std::make_shared<T>(data);
233227
auto &rc = get();
234-
if (rc.push_chunk(IOTask(this, std::move(dWrite))))
235-
{
236-
seriesFlush();
237-
}
228+
rc.push_chunk(IOTask(this, std::move(dWrite)));
238229
}
239230
} // namespace openPMD

src/RecordComponent.cpp

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "openPMD/Dataset.hpp"
2323
#include "openPMD/DatatypeHelpers.hpp"
2424
#include "openPMD/Error.hpp"
25+
#include "openPMD/IO/AbstractIOHandler.hpp"
2526
#include "openPMD/IO/Format.hpp"
2627
#include "openPMD/LoadStoreChunk.hpp"
2728
#include "openPMD/Series.hpp"
@@ -52,7 +53,8 @@ namespace openPMD
5253
namespace internal
5354
{
5455
RecordComponentData::RecordComponentData() = default;
55-
auto RecordComponentData::push_chunk(IOTask &&task) -> bool
56+
void RecordComponentData::push_chunk(
57+
IOTask &&task, std::optional<bool> immediate_flush)
5658
{
5759
Attributable a;
5860
a.setData(std::shared_ptr<AttributableData>{this, [](auto const &) {}});
@@ -74,9 +76,29 @@ namespace internal
7476
"Cannot write/read chunks to/from closed Iterations.");
7577
}
7678
#endif
79+
bool immediate_flush_resolved = [&]() {
80+
if (immediate_flush.has_value())
81+
{
82+
return *immediate_flush;
83+
}
84+
else
85+
{
86+
return a.IOHandler()->m_flush_immediately;
87+
}
88+
}();
7789
a.setDirtyRecursive(true);
78-
m_chunks.push(std::move(task));
79-
return a.IOHandler()->m_flush_immediately;
90+
if (immediate_flush_resolved)
91+
{
92+
a.seriesFlush_impl<false>(
93+
internal::FlushParams{FlushLevel::ImmediateFlush});
94+
auto IOHandler = a.IOHandler();
95+
IOHandler->enqueue(task);
96+
IOHandler->flush(FlushLevel::UserFlush);
97+
}
98+
else
99+
{
100+
m_chunks.push(std::move(task));
101+
}
80102
}
81103

82104
static constexpr char const *note_on_deactivating_this_check = R"(
@@ -465,7 +487,8 @@ void RecordComponent::flush(
465487
{
466488
return;
467489
}
468-
if (access::readOnly(IOHandler()->m_frontendAccess))
490+
if (access::readOnly(IOHandler()->m_frontendAccess) &&
491+
flush_level::global_flushpoint(flushParams.flushLevel))
469492
{
470493
while (!rc.m_chunks.empty())
471494
{
@@ -692,10 +715,7 @@ void RecordComponent::storeChunk_impl(
692715
/* std::static_pointer_cast correctly reference-counts the pointer */
693716
dWrite.data = std::move(buffer);
694717
auto &rc = get();
695-
if (rc.push_chunk(IOTask(this, std::move(dWrite))))
696-
{
697-
seriesFlush();
698-
}
718+
rc.push_chunk(IOTask(this, std::move(dWrite)));
699719
}
700720

701721
void RecordComponent::verifyChunk(
@@ -956,10 +976,7 @@ void RecordComponent::loadChunk_impl(
956976
dRead.extent = extent;
957977
dRead.dtype = getDatatype();
958978
dRead.data = std::static_pointer_cast<void>(data);
959-
if (rc.push_chunk(IOTask(this, dRead)))
960-
{
961-
seriesFlush();
962-
}
979+
rc.push_chunk(IOTask(this, dRead));
963980
}
964981
}
965982

0 commit comments

Comments
 (0)