Skip to content

Commit eebabdd

Browse files
committed
First attempt at immediate flushing
1 parent 6e78284 commit eebabdd

5 files changed

Lines changed: 30 additions & 7 deletions

File tree

include/openPMD/IO/AbstractIOHandler.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,8 @@ namespace internal
230230
OpenpmdStandard m_standard =
231231
auxiliary::parseStandard(getStandardDefault());
232232
bool m_verify_homogeneous_extents = true;
233+
// If true, then flush directly upon storeChunk
234+
bool m_flush_immediately = false;
233235

234236
protected:
235237
explicit GlobalParameters();

include/openPMD/RecordComponent.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ namespace internal
7474
*/
7575
std::queue<IOTask> m_chunks;
7676

77-
void push_chunk(IOTask &&task);
77+
[[nodiscard]] auto push_chunk(IOTask &&task) -> bool;
7878
/**
7979
* Stores the value for constant record components.
8080
* Ignored otherwise.

include/openPMD/backend/PatchRecordComponent.hpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,10 @@ inline void PatchRecordComponent::load(std::shared_ptr<T> data)
157157
dRead.dtype = getDatatype();
158158
dRead.data = std::static_pointer_cast<void>(data);
159159
auto &rc = get();
160-
rc.push_chunk(IOTask(this, dRead));
160+
if (rc.push_chunk(IOTask(this, dRead)))
161+
{
162+
seriesFlush();
163+
}
161164
}
162165

163166
template <typename T>
@@ -196,7 +199,10 @@ inline void PatchRecordComponent::store(uint64_t idx, T data)
196199
dWrite.dtype = dtype;
197200
dWrite.data = std::make_shared<T>(data);
198201
auto &rc = get();
199-
rc.push_chunk(IOTask(this, std::move(dWrite)));
202+
if (rc.push_chunk(IOTask(this, std::move(dWrite))))
203+
{
204+
seriesFlush();
205+
}
200206
}
201207

202208
template <typename T>
@@ -225,6 +231,9 @@ inline void PatchRecordComponent::store(T data)
225231
dWrite.dtype = dtype;
226232
dWrite.data = std::make_shared<T>(data);
227233
auto &rc = get();
228-
rc.push_chunk(IOTask(this, std::move(dWrite)));
234+
if (rc.push_chunk(IOTask(this, std::move(dWrite))))
235+
{
236+
seriesFlush();
237+
}
229238
}
230239
} // namespace openPMD

src/RecordComponent.cpp

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ namespace openPMD
5252
namespace internal
5353
{
5454
RecordComponentData::RecordComponentData() = default;
55-
auto RecordComponentData::push_chunk(IOTask &&task) -> void
55+
auto RecordComponentData::push_chunk(IOTask &&task) -> bool
5656
{
5757
Attributable a;
5858
a.setData(std::shared_ptr<AttributableData>{this, [](auto const &) {}});
@@ -76,6 +76,7 @@ namespace internal
7676
#endif
7777
a.setDirtyRecursive(true);
7878
m_chunks.push(std::move(task));
79+
return a.IOHandler()->m_flush_immediately;
7980
}
8081

8182
static constexpr char const *note_on_deactivating_this_check = R"(
@@ -691,7 +692,10 @@ void RecordComponent::storeChunk_impl(
691692
/* std::static_pointer_cast correctly reference-counts the pointer */
692693
dWrite.data = std::move(buffer);
693694
auto &rc = get();
694-
rc.push_chunk(IOTask(this, std::move(dWrite)));
695+
if (rc.push_chunk(IOTask(this, std::move(dWrite))))
696+
{
697+
seriesFlush();
698+
}
695699
}
696700

697701
void RecordComponent::verifyChunk(
@@ -952,7 +956,10 @@ void RecordComponent::loadChunk_impl(
952956
dRead.extent = extent;
953957
dRead.dtype = getDatatype();
954958
dRead.data = std::static_pointer_cast<void>(data);
955-
rc.push_chunk(IOTask(this, dRead));
959+
if (rc.push_chunk(IOTask(this, dRead)))
960+
{
961+
seriesFlush();
962+
}
956963
}
957964
}
958965

src/Series.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3174,6 +3174,11 @@ void Series::parseJsonOptions(
31743174
"verify_homogeneous_extents",
31753175
gp.m_verify_homogeneous_extents,
31763176
"OPENPMD_VERIFY_HOMOGENEOUS_EXTENTS");
3177+
getJsonOption<bool>(
3178+
options,
3179+
"flush_immediately",
3180+
gp.m_flush_immediately,
3181+
"OPENPMD_FLUSH_IMMEDIATELY");
31773182
internal::SeriesData::SourceSpecifiedViaJSON rankTableSource;
31783183
if (getJsonOptionLowerCase(options, "rank_table", rankTableSource.value))
31793184
{

0 commit comments

Comments
 (0)